You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/01/31 04:43:56 UTC
[19/62] [abbrv] [partial] incubator-nifi git commit: NIFI-270 Made
all changes identified by adam, mark, joey to prep for a cleaner build
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
deleted file mode 100644
index ec1f26d..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster;
-import org.apache.nifi.cluster.protocol.impl.MulticastProtocolListener;
-import java.net.InetSocketAddress;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
-import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage;
-import org.apache.nifi.io.socket.multicast.DiscoverableService;
-import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
-import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
-import org.junit.After;
-import static org.junit.Assert.*;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-/**
- * @author unattributed
- */
-public class ClusterServicesBroadcasterTest {
-
- private ClusterServicesBroadcaster broadcaster;
-
- private MulticastProtocolListener listener;
-
- private DummyProtocolHandler handler;
-
- private InetSocketAddress multicastAddress;
-
- private DiscoverableService broadcastedService;
-
- private ProtocolContext protocolContext;
-
- private MulticastConfiguration configuration;
-
- @Before
- public void setup() throws Exception {
-
- broadcastedService = new DiscoverableServiceImpl("some-service", new InetSocketAddress("localhost", 11111));
-
- multicastAddress = new InetSocketAddress("225.1.1.1", 22222);
-
- configuration = new MulticastConfiguration();
-
- protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
-
- broadcaster = new ClusterServicesBroadcaster(multicastAddress, configuration, protocolContext, "500 ms");
- broadcaster.addService(broadcastedService);
-
- handler = new DummyProtocolHandler();
- listener = new MulticastProtocolListener(5, multicastAddress, configuration, protocolContext);
- listener.addHandler(handler);
- }
-
- @After
- public void teardown() {
-
- if(broadcaster.isRunning()) {
- broadcaster.stop();
- }
-
- try {
- if(listener.isRunning()) {
- listener.stop();
- }
- } catch(Exception ex) {
- ex.printStackTrace(System.out);
- }
-
- }
-
- @Ignore("fails needs to be fixed")
- @Test
- public void testBroadcastReceived() throws Exception {
-
- broadcaster.start();
- listener.start();
-
- Thread.sleep(1000);
-
- listener.stop();
-
- assertNotNull(handler.getProtocolMessage());
- assertEquals(ProtocolMessage.MessageType.SERVICE_BROADCAST, handler.getProtocolMessage().getType());
- final ServiceBroadcastMessage msg = (ServiceBroadcastMessage) handler.getProtocolMessage();
- assertEquals(broadcastedService.getServiceName(), msg.getServiceName());
- assertEquals(broadcastedService.getServiceAddress().getHostName(), msg.getAddress());
- assertEquals(broadcastedService.getServiceAddress().getPort(), msg.getPort());
- }
-
- private class DummyProtocolHandler implements ProtocolHandler {
-
- private ProtocolMessage protocolMessage;
-
- @Override
- public boolean canHandle(ProtocolMessage msg) {
- return true;
- }
-
- @Override
- public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
- this.protocolMessage = msg;
- return null;
- }
-
- public ProtocolMessage getProtocolMessage() {
- return protocolMessage;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
deleted file mode 100644
index 4233d88..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.InetSocketAddress;
-import java.net.MulticastSocket;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
-import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
-import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
-import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
-import org.apache.nifi.cluster.protocol.message.PingMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
-import org.apache.nifi.io.socket.multicast.MulticastUtils;
-import org.junit.After;
-import static org.junit.Assert.*;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-/**
- * @author unattributed
- */
-public class MulticastProtocolListenerTest {
-
- private MulticastProtocolListener listener;
-
- private MulticastSocket socket;
-
- private InetSocketAddress address;
-
- private MulticastConfiguration configuration;
-
- private ProtocolContext protocolContext;
-
- @Before
- public void setup() throws Exception {
-
- address = new InetSocketAddress("226.1.1.1", 60000);
- configuration = new MulticastConfiguration();
-
- protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
-
- listener = new MulticastProtocolListener(5, address, configuration, protocolContext);
- listener.start();
-
- socket = MulticastUtils.createMulticastSocket(address.getPort(), configuration);
- }
-
- @After
- public void teardown() throws IOException {
- try {
- if(listener.isRunning()) {
- listener.stop();
- }
- } finally {
- MulticastUtils.closeQuietly(socket);
- }
- }
-
- @Ignore("Test needs to be reworked. Fails if on a system without actiev network connection")
- @Test
- public void testBadRequest() throws Exception {
- DelayedProtocolHandler handler = new DelayedProtocolHandler(0);
- listener.addHandler(handler);
- DatagramPacket packet = new DatagramPacket(new byte[] {5}, 1, address);
- socket.send(packet);
- Thread.sleep(250);
- assertEquals(0, handler.getMessages().size());
- }
-
- @Ignore("this test works sometimes and fails others - needs work to be reliable")
- @Test
- public void testRequest() throws Exception {
-
- ReflexiveProtocolHandler handler = new ReflexiveProtocolHandler();
- listener.addHandler(handler);
-
- ProtocolMessage msg = new PingMessage();
- MulticastProtocolMessage multicastMsg = new MulticastProtocolMessage("some-id", msg);
-
- // marshal message to output stream
- ProtocolMessageMarshaller marshaller = protocolContext.createMarshaller();
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- marshaller.marshal(multicastMsg, baos);
- byte[] requestPacketBytes = baos.toByteArray();
- DatagramPacket packet = new DatagramPacket(requestPacketBytes, requestPacketBytes.length, address);
- socket.send(packet);
-
- Thread.sleep(250);
- assertEquals(1, handler.getMessages().size());
- assertEquals(msg.getType(), handler.getMessages().get(0).getType());
-
- }
-
- private class ReflexiveProtocolHandler implements ProtocolHandler {
-
- private List<ProtocolMessage> messages = new ArrayList<>();
-
- @Override
- public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
- messages.add(msg);
- return msg;
- }
-
- @Override
- public boolean canHandle(ProtocolMessage msg) {
- return true;
- }
-
- public List<ProtocolMessage> getMessages() {
- return messages;
- }
-
- }
-
- private class DelayedProtocolHandler implements ProtocolHandler {
-
- private int delay = 0;
-
- private List<ProtocolMessage> messages = new ArrayList<>();
-
- public DelayedProtocolHandler(int delay) {
- this.delay = delay;
- }
-
- @Override
- public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
- try {
- messages.add(msg);
- Thread.sleep(delay);
- return null;
- } catch(final InterruptedException ie) {
- throw new ProtocolException(ie);
- }
-
- }
-
- @Override
- public boolean canHandle(ProtocolMessage msg) {
- return true;
- }
-
- public List<ProtocolMessage> getMessages() {
- return messages;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
deleted file mode 100644
index 1c5ba9e..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import org.apache.nifi.cluster.protocol.impl.ClusterServiceLocator;
-import org.apache.nifi.cluster.protocol.impl.SocketProtocolListener;
-import org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderImpl;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.UUID;
-
-import org.apache.nifi.cluster.protocol.ConnectionRequest;
-import org.apache.nifi.cluster.protocol.ConnectionResponse;
-import org.apache.nifi.cluster.protocol.Heartbeat;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.StandardDataFlow;
-import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
-import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
-import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
-import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
-import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
-import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
-import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.PingMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.io.socket.ServerSocketConfiguration;
-import org.apache.nifi.io.socket.SocketConfiguration;
-import org.apache.nifi.io.socket.multicast.DiscoverableService;
-import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/**
- * @author unattributed
- */
-@Ignore("Randomly tests... probably timing-specific")
-public class NodeProtocolSenderImplTest {
-
- private SocketProtocolListener listener;
-
- private NodeProtocolSenderImpl sender;
-
- private DiscoverableService service;
-
- private ServerSocketConfiguration serverSocketConfiguration;
-
- private ClusterServiceLocator mockServiceLocator;
-
- private ProtocolHandler mockHandler;
-
- private NodeIdentifier nodeIdentifier;
-
- @Before
- public void setup() throws IOException {
-
- serverSocketConfiguration = new ServerSocketConfiguration();
-
- mockServiceLocator = mock(ClusterServiceLocator.class);
- mockHandler = mock(ProtocolHandler.class);
-
- nodeIdentifier = new NodeIdentifier("1", "localhost", 1234, "localhost", 5678);
-
- ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
-
- listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, protocolContext);
- listener.setShutdownListenerSeconds(3);
- listener.addHandler(mockHandler);
- listener.start();
-
- service = new DiscoverableServiceImpl("some-service", new InetSocketAddress("localhost", listener.getPort()));
-
- SocketConfiguration socketConfiguration = new SocketConfiguration();
- socketConfiguration.setReuseAddress(true);
- sender = new NodeProtocolSenderImpl(mockServiceLocator, socketConfiguration, protocolContext);
- }
-
- @After
- public void teardown() throws IOException {
- if(listener.isRunning()) {
- listener.stop();
- }
- }
-
- @Test
- public void testConnect() throws Exception {
-
- when(mockServiceLocator.getService()).thenReturn(service);
- when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
- ConnectionResponseMessage mockMessage = new ConnectionResponseMessage();
- mockMessage.setConnectionResponse(new ConnectionResponse(nodeIdentifier, new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], new byte[0]), false, null, null, UUID.randomUUID().toString()));
- when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(mockMessage);
-
- ConnectionRequestMessage request = new ConnectionRequestMessage();
- request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
- ConnectionResponseMessage response = sender.requestConnection(request);
- assertNotNull(response);
- }
-
- @Test(expected = UnknownServiceAddressException.class)
- public void testConnectNoClusterManagerAddress() throws Exception {
-
- when(mockServiceLocator.getService()).thenReturn(null);
- when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
- when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new ConnectionResponseMessage());
-
- ConnectionRequestMessage request = new ConnectionRequestMessage();
- request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
-
- sender.requestConnection(request);
- fail("failed to throw exception");
- }
-
- @Test(expected = ProtocolException.class)
- public void testConnectBadResponse() throws Exception {
-
- when(mockServiceLocator.getService()).thenReturn(service);
- when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
- when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new PingMessage());
-
- ConnectionRequestMessage request = new ConnectionRequestMessage();
- request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
-
- sender.requestConnection(request);
- fail("failed to throw exception");
-
- }
-
- @Test(expected = ProtocolException.class)
- public void testConnectDelayedResponse() throws Exception {
-
- final int time = 250;
- sender.getSocketConfiguration().setSocketTimeout(time);
- when(mockServiceLocator.getService()).thenReturn(service);
- when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
- when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new Answer<ConnectionResponseMessage>() {
- @Override
- public ConnectionResponseMessage answer(InvocationOnMock invocation) throws Throwable {
- Thread.sleep(time * 3);
- return new ConnectionResponseMessage();
- }
- });
- ConnectionRequestMessage request = new ConnectionRequestMessage();
- request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
-
- sender.requestConnection(request);
- fail("failed to throw exception");
-
- }
-
- @Test
- public void testHeartbeat() throws Exception {
-
- when(mockServiceLocator.getService()).thenReturn(service);
- when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
- when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null);
-
- HeartbeatMessage hb = new HeartbeatMessage();
- hb.setHeartbeat(new Heartbeat(new NodeIdentifier("id", "localhost", 3, "localhost", 4), false, false, new byte[] {1, 2, 3}));
- sender.heartbeat(hb);
- }
-
- @Test
- public void testNotifyControllerStartupFailure() throws Exception {
-
- when(mockServiceLocator.getService()).thenReturn(service);
- when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
- when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null);
-
- ControllerStartupFailureMessage msg = new ControllerStartupFailureMessage();
- msg.setNodeId(new NodeIdentifier("some-id", "some-addr", 1, "some-addr", 1));
- msg.setExceptionMessage("some exception");
- sender.notifyControllerStartupFailure(msg);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java
deleted file mode 100644
index 07ee83a..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/DelayedProtocolHandler.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl.testutils;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-
-/**
- * @author unattributed
- */
-public class DelayedProtocolHandler implements ProtocolHandler {
-
- private int delay = 0;
- private List<ProtocolMessage> messages = new ArrayList<>();
-
- public DelayedProtocolHandler(int delay) {
- this.delay = delay;
- }
-
- @Override
- public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
- try {
- messages.add(msg);
- Thread.sleep(delay);
- return null;
- } catch (final InterruptedException ie) {
- throw new ProtocolException(ie);
- }
-
- }
-
- @Override
- public boolean canHandle(ProtocolMessage msg) {
- return true;
- }
-
- public List<ProtocolMessage> getMessages() {
- return messages;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java
deleted file mode 100644
index 4e3b932..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl.testutils;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-
-/**
- * @author unattributed
- */
-public class ReflexiveProtocolHandler implements ProtocolHandler {
-
- private List<ProtocolMessage> messages = new ArrayList<>();
-
- @Override
- public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
- messages.add(msg);
- return msg;
- }
-
- @Override
- public boolean canHandle(ProtocolMessage msg) {
- return true;
- }
-
- public List<ProtocolMessage> getMessages() {
- return messages;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-web/.gitignore
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-web/.gitignore b/nifi/nar-bundles/framework-bundle/framework/cluster-web/.gitignore
deleted file mode 100755
index ea8c4bf..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-web/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-web/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-web/pom.xml b/nifi/nar-bundles/framework-bundle/framework/cluster-web/pom.xml
deleted file mode 100644
index f6d9d2a..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-web/pom.xml
+++ /dev/null
@@ -1,50 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-framework-parent</artifactId>
- <version>0.0.1-incubating-SNAPSHOT</version>
- </parent>
- <artifactId>framework-cluster-web</artifactId>
- <packaging>jar</packaging>
- <name>NiFi Framework Cluster Web</name>
- <description>The clustering software for communicating with the NiFi web api.</description>
- <dependencies>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-properties</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>web-optimistic-locking</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-administration</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-user-actions</artifactId>
- </dependency>
-
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java b/nifi/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java
deleted file mode 100644
index 44fb25a..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContext.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.context;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.nifi.action.Action;
-import org.apache.nifi.web.Revision;
-
-/**
- * Contains contextual information about clustering that may be serialized
- * between manager and node when communicating over HTTP.
- */
-public interface ClusterContext extends Serializable {
-
- /**
- * Returns a list of auditable actions. The list is modifiable
- * and will never be null.
- * @return a collection of actions
- */
- List<Action> getActions();
-
- Revision getRevision();
-
- void setRevision(Revision revision);
-
- /**
- * @return true if the request was sent by the cluster manager; false otherwise
- */
- boolean isRequestSentByClusterManager();
-
- /**
- * Sets the flag to indicate if a request was sent by the cluster manager.
- * @param flag true if the request was sent by the cluster manager; false otherwise
- */
- void setRequestSentByClusterManager(boolean flag);
-
- /**
- * Gets an id generation seed. This is used to ensure that nodes are able to generate the
- * same id across the cluster. This is usually handled by the cluster manager creating the
- * id, however for some actions (snippets, templates, etc) this is not possible.
- * @return
- */
- String getIdGenerationSeed();
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java b/nifi/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java
deleted file mode 100644
index 06907d2..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextImpl.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.context;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import org.apache.nifi.action.Action;
-import org.apache.nifi.web.Revision;
-
-/**
- * A basic implementation of the context.
- */
-public class ClusterContextImpl implements ClusterContext, Serializable {
-
- private final List<Action> actions = new ArrayList<>();
-
- private Revision revision;
-
- private boolean requestSentByClusterManager;
-
- private final String idGenerationSeed = UUID.randomUUID().toString();
-
- @Override
- public List<Action> getActions() {
- return actions;
- }
-
- @Override
- public Revision getRevision() {
- return revision;
- }
-
- @Override
- public void setRevision(Revision revision) {
- this.revision = revision;
- }
-
- @Override
- public boolean isRequestSentByClusterManager() {
- return requestSentByClusterManager;
- }
-
- @Override
- public void setRequestSentByClusterManager(boolean requestSentByClusterManager) {
- this.requestSentByClusterManager = requestSentByClusterManager;
- }
-
- @Override
- public String getIdGenerationSeed() {
- return this.idGenerationSeed;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java b/nifi/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
deleted file mode 100644
index 012e7c7..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.context;
-
-/**
- * Manages a cluster context on a threadlocal.
- */
-public class ClusterContextThreadLocal {
-
- private static final ThreadLocal<ClusterContext> contextHolder = new ThreadLocal<>();
-
- public static void removeContext() {
- contextHolder.remove();
- }
-
- public static ClusterContext createEmptyContext() {
- return new ClusterContextImpl();
- }
-
- public static ClusterContext getContext() {
- ClusterContext ctx = contextHolder.get();
- if(ctx == null) {
- ctx = createEmptyContext();
- contextHolder.set(ctx);
- }
- return ctx;
- }
-
- public static void setContext(final ClusterContext context) {
- contextHolder.set(context);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java b/nifi/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java
deleted file mode 100644
index 90b8a37..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.web;
-
-import org.apache.nifi.cluster.context.ClusterContext;
-import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
-
-/**
- * An optimistic locking manager that provides for optimistic locking in a clustered
- * environment.
- *
- * @author unattributed
- */
-public class ClusterAwareOptimisticLockingManager implements OptimisticLockingManager {
-
- private final OptimisticLockingManager optimisticLockingManager;
-
- public ClusterAwareOptimisticLockingManager(final OptimisticLockingManager optimisticLockingManager) {
- this.optimisticLockingManager = optimisticLockingManager;
- }
-
- @Override
- public Revision checkRevision(Revision revision) throws InvalidRevisionException {
- final Revision currentRevision = getRevision();
- if(currentRevision.equals(revision) == false) {
- throw new InvalidRevisionException(String.format("Given revision %s does not match current revision %s.", revision, currentRevision));
- } else {
- return revision.increment(revision.getClientId());
- }
- }
-
- @Override
- public boolean isCurrent(Revision revision) {
- return getRevision().equals(revision);
- }
-
- @Override
- public Revision getRevision() {
- final ClusterContext ctx = ClusterContextThreadLocal.getContext();
- if(ctx == null || ctx.getRevision() == null) {
- return optimisticLockingManager.getRevision();
- } else {
- return ctx.getRevision();
- }
- }
-
- @Override
- public void setRevision(final Revision revision) {
- final ClusterContext ctx = ClusterContextThreadLocal.getContext();
- if(ctx != null) {
- ctx.setRevision(revision);
- }
- optimisticLockingManager.setRevision(revision);
- }
-
- @Override
- public Revision incrementRevision() {
- final Revision currentRevision = getRevision();
- final Revision incRevision = currentRevision.increment();
- setRevision(incRevision);
- return incRevision;
- }
-
- @Override
- public Revision incrementRevision(final String clientId) {
- final Revision currentRevision = getRevision();
- final Revision incRevision = currentRevision.increment(clientId);
- setRevision(incRevision);
- return incRevision;
- }
-
- @Override
- public String getLastModifier() {
- return optimisticLockingManager.getLastModifier();
- }
-
- @Override
- public void setLastModifier(final String lastModifier) {
- optimisticLockingManager.setLastModifier(lastModifier);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/.gitignore
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/.gitignore b/nifi/nar-bundles/framework-bundle/framework/cluster/.gitignore
deleted file mode 100755
index ea8c4bf..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/pom.xml b/nifi/nar-bundles/framework-bundle/framework/cluster/pom.xml
deleted file mode 100644
index a06ef94..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/pom.xml
+++ /dev/null
@@ -1,132 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-framework-parent</artifactId>
- <version>0.0.1-incubating-SNAPSHOT</version>
- </parent>
- <artifactId>framework-cluster</artifactId>
- <packaging>jar</packaging>
- <name>NiFi Framework Cluster</name>
- <description>The clustering software for NiFi.</description>
- <dependencies>
-
- <!-- application core dependencies -->
-
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-properties</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-logging-utils</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-utils</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>client-dto</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>framework-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>core-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>framework-cluster-protocol</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>framework-cluster-web</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-web-utils</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-administration</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>site-to-site</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-compress</artifactId>
- </dependency>
-
- <!-- third party dependencies -->
-
- <!-- sun dependencies -->
- <dependency>
- <groupId>javax.servlet</groupId>
- <artifactId>javax.servlet-api</artifactId>
- </dependency>
-
- <!-- commons dependencies -->
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-net</groupId>
- <artifactId>commons-net</artifactId>
- </dependency>
-
- <!-- jersey dependencies -->
- <dependency>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-client</artifactId>
- </dependency>
- <dependency>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-server</artifactId>
- </dependency>
- <dependency>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-json</artifactId>
- </dependency>
-
- <!-- spring dependencies -->
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-beans</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-context</artifactId>
- </dependency>
-
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/client/MulticastTestClient.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/client/MulticastTestClient.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/client/MulticastTestClient.java
deleted file mode 100644
index 0b70c61..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/client/MulticastTestClient.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.client;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.InetSocketAddress;
-import java.net.MulticastSocket;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Locale;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
-import org.apache.nifi.cluster.protocol.impl.MulticastProtocolListener;
-import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
-import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
-import org.apache.nifi.cluster.protocol.message.PingMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
-import org.apache.nifi.io.socket.multicast.MulticastUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Simple multicast test client that sends ping messages to a group address.
- */
-public class MulticastTestClient {
-
- private static final Logger logger = LoggerFactory.getLogger(MulticastTestClient.class);
-
- private static final int PING_DELAY_SECONDS = 3;
-
- public static void main(final String... args) throws IOException {
-
- String group = System.getProperty("group", "225.0.0.0");
- if (group == null) {
- System.out.println("Host system property 'group' was not given.");
- return;
- }
- group = group.trim();
- if (group.length() == 0) {
- System.out.println("Host system property 'group' must be non-empty.");
- return;
- }
-
- final String portStr = System.getProperty("port", "2222");
- final int port;
- try {
- port = Integer.parseInt(portStr);
- } catch (final NumberFormatException nfe) {
- System.out.println("Port system property 'port' was not a valid port.");
- return;
- }
-
- logger.info(String.format("Pinging every %s seconds using multicast address: %s:%s.", PING_DELAY_SECONDS, group, port));
- logger.info("Override defaults by using system properties '-Dgroup=<Class D IP>' and '-Dport=<unused port>'.");
- logger.info("The test client may be stopped by entering a newline at the command line.");
-
- final InetSocketAddress addr = new InetSocketAddress(group, port);
- final ProtocolContext<ProtocolMessage> protocolContext = new JaxbProtocolContext<ProtocolMessage>(JaxbProtocolUtils.JAXB_CONTEXT);
- final MulticastConfiguration multicastConfig = new MulticastConfiguration();
- multicastConfig.setReuseAddress(true);
-
- // setup listener
- final MulticastProtocolListener listener = new MulticastProtocolListener(1, addr, multicastConfig, protocolContext);
- listener.addHandler(new ProtocolHandler() {
- @Override
- public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
- final PingMessage pingMsg = (PingMessage) msg;
- final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss", Locale.US);
- logger.info("Pinged at: " + sdf.format(pingMsg.getDate()));
- return null;
- }
-
- @Override
- public boolean canHandle(ProtocolMessage msg) {
- return true;
- }
- });
-
- // setup socket
- final MulticastSocket multicastSocket = MulticastUtils.createMulticastSocket(multicastConfig);
-
- // setup broadcaster
- final Timer broadcaster = new Timer("Multicast Test Client", /**
- * is daemon *
- */
- true);
-
- try {
-
- // start listening
- listener.start();
-
- // start broadcasting
- broadcaster.schedule(new TimerTask() {
-
- @Override
- public void run() {
- try {
-
- final PingMessage msg = new PingMessage();
- msg.setDate(new Date());
-
- // marshal message to output stream
- final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- marshaller.marshal(msg, baos);
- final byte[] packetBytes = baos.toByteArray();
-
- // send message
- final DatagramPacket packet = new DatagramPacket(packetBytes, packetBytes.length, addr);
- multicastSocket.send(packet);
-
- } catch (final Exception ex) {
- logger.warn("Failed to send message due to: " + ex, ex);
- }
- }
- }, 0, PING_DELAY_SECONDS * 1000);
-
- // block until any input is received
- System.in.read();
-
- } finally {
- broadcaster.cancel();
- if (listener.isRunning()) {
- listener.stop();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/Event.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/Event.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/Event.java
deleted file mode 100644
index 6bc5d6c..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/Event.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.event;
-
-import java.util.Date;
-import org.apache.commons.lang3.StringUtils;
-
-/**
- * Events describe the occurrence of something noteworthy. They record the
- * event's source, a timestamp, a description, and a category.
- *
- * @author unattributed
- *
- * @Immutable
- */
-public class Event {
-
- public static enum Category {
-
- DEBUG,
- INFO,
- WARN
- }
-
- private final String source;
-
- private final long timestamp;
-
- private final Category category;
-
- private final String message;
-
- /**
- * Creates an event with the current time as the timestamp and a category of
- * "INFO".
- *
- * @param source the source
- * @param message the description
- */
- public Event(final String source, final String message) {
- this(source, message, Category.INFO);
- }
-
- /**
- * Creates an event with the current time as the timestamp.
- *
- * @param source the source
- * @param message the description
- * @param category the event category
- */
- public Event(final String source, final String message, final Category category) {
- this(source, message, category, new Date().getTime());
- }
-
- /**
- * Creates an event with the a category of "INFO".
- *
- * @param source the source
- * @param message the description
- * @param timestamp the time of occurrence
- */
- public Event(final String source, final String message, final long timestamp) {
- this(source, message, Category.INFO, timestamp);
- }
-
- /**
- * Creates an event.
- *
- * @param source the source
- * @param message the description
- * @param category the event category
- * @param timestamp the time of occurrence
- */
- public Event(final String source, final String message, final Category category, final long timestamp) {
-
- if (StringUtils.isBlank(source)) {
- throw new IllegalArgumentException("Source may not be empty or null.");
- } else if (StringUtils.isBlank(message)) {
- throw new IllegalArgumentException("Event message may not be empty or null.");
- } else if (category == null) {
- throw new IllegalArgumentException("Event category may not be null.");
- } else if (timestamp < 0) {
- throw new IllegalArgumentException("Timestamp may not be negative: " + timestamp);
- }
-
- this.source = source;
- this.message = message;
- this.category = category;
- this.timestamp = timestamp;
- }
-
- public Category getCategory() {
- return category;
- }
-
- public String getMessage() {
- return message;
- }
-
- public String getSource() {
- return source;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/EventManager.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/EventManager.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/EventManager.java
deleted file mode 100644
index f9dfb00..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/EventManager.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.event;
-
-import java.util.List;
-
-/**
- * Manages an ordered list of events. The event history size dictates the total
- * number of events to manage for a given source at a given time. When the size
- * is exceeded, the oldest event for that source is evicted.
- *
- * @author unattributed
- */
-public interface EventManager {
-
- /**
- * Adds an event to the manager.
- *
- * @param event an Event
- */
- void addEvent(Event event);
-
- /**
- * Returns a list of events for a given source sorted by the event's
- * timestamp where the most recent event is first in the list.
- *
- * @param eventSource the source
- *
- * @return the list of events
- */
- List<Event> getEvents(String eventSource);
-
- /*
- * Returns the most recent event for the source. If no events exist, then
- * null is returned.
- */
- Event getMostRecentEvent(String eventSource);
-
- /*
- * Clears all events for the given source.
- */
- void clearEventHistory(String eventSource);
-
- /**
- * Returns the history size.
- *
- * @return the history size
- */
- int getEventHistorySize();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java
deleted file mode 100644
index 7fadc78..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/event/impl/EventManagerImpl.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.event.impl;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import org.apache.nifi.cluster.event.Event;
-import org.apache.nifi.cluster.event.EventManager;
-
-/**
- * Implements the EventManager.
- *
- * @author unattributed
- */
-public class EventManagerImpl implements EventManager {
-
- /**
- * associates the source ID with an ordered queue of events, ordered by most
- * recent event
- */
- private final Map<String, Queue<Event>> eventsMap = new HashMap<>();
-
- /**
- * the number of events to maintain for a given source
- */
- private final int eventHistorySize;
-
- /**
- * Creates an instance.
- *
- * @param eventHistorySize the number of events to manage for a given
- * source. Value must be positive.
- */
- public EventManagerImpl(final int eventHistorySize) {
- if (eventHistorySize <= 0) {
- throw new IllegalArgumentException("Event history size must be positive: " + eventHistorySize);
- }
- this.eventHistorySize = eventHistorySize;
- }
-
- @Override
- public void addEvent(final Event event) {
-
- if (event == null) {
- throw new IllegalArgumentException("Event may not be null.");
- }
-
- Queue<Event> events = eventsMap.get(event.getSource());
- if (events == null) {
- // no events from this source, so add a new queue to the map
- events = new PriorityQueue<>(eventHistorySize, createEventComparator());
- eventsMap.put(event.getSource(), events);
- }
-
- // add event
- events.add(event);
-
- // if we exceeded the history size, then evict the oldest event
- if (events.size() > eventHistorySize) {
- removeOldestEvent(events);
- }
-
- }
-
- @Override
- public List<Event> getEvents(final String eventSource) {
- final Queue<Event> events = eventsMap.get(eventSource);
- if (events == null) {
- return Collections.EMPTY_LIST;
- } else {
- return Collections.unmodifiableList(new ArrayList<>(events));
- }
- }
-
- @Override
- public int getEventHistorySize() {
- return eventHistorySize;
- }
-
- @Override
- public Event getMostRecentEvent(final String eventSource) {
- final Queue<Event> events = eventsMap.get(eventSource);
- if (events == null) {
- return null;
- } else {
- return events.peek();
- }
- }
-
- @Override
- public void clearEventHistory(final String eventSource) {
- eventsMap.remove(eventSource);
- }
-
- private Comparator createEventComparator() {
- return new Comparator<Event>() {
- @Override
- public int compare(final Event o1, final Event o2) {
- // orders events by most recent first
- return (int) (o2.getTimestamp() - o1.getTimestamp());
- }
- };
- }
-
- private void removeOldestEvent(final Collection<Event> events) {
-
- if (events.isEmpty()) {
- return;
- }
-
- Event oldestEvent = null;
- for (final Event event : events) {
- if (oldestEvent == null || oldestEvent.getTimestamp() > event.getTimestamp()) {
- oldestEvent = event;
- }
- }
-
- events.remove(oldestEvent);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java
deleted file mode 100644
index 2e3d278..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/ClusterNodeFirewall.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.firewall;
-
-/**
- * Defines the interface for restricting external client connections to a set of
- * hosts or IPs.
- */
-public interface ClusterNodeFirewall {
-
- /**
- * Returns true if the given host or IP is permissible through the firewall;
- * false otherwise.
- *
- * If an IP is given, then it must be formatted in dotted decimal notation.
- * @param hostOrIp
- * @return
- */
- boolean isPermissible(String hostOrIp);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java
deleted file mode 100644
index 916ec14..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/firewall/impl/FileBasedClusterNodeFirewall.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.firewall.impl;
-
-import java.io.*;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.*;
-import org.apache.commons.net.util.SubnetUtils;
-import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
-import org.apache.nifi.util.file.FileUtils;
-import org.apache.nifi.logging.NiFiLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A file-based implementation of the ClusterFirewall interface. The class is
- * configured with a file. If the file is empty, then everything is permissible.
- * Otherwise, the file should contain hostnames or IPs formatted as dotted
- * decimals with an optional CIDR suffix. Each entry must be separated by a
- * newline. An example configuration is given below:
- *
- * <code>
- * # hash character is a comment delimiter
- * 1.2.3.4 # exact IP
- * some.host.name # a host name
- * 4.5.6.7/8 # range of CIDR IPs
- * 9.10.11.12/13 # a smaller range of CIDR IPs
- * </code>
- *
- * This class allows for synchronization with an optionally configured restore
- * directory. If configured, then at startup, if the either the config file or
- * the restore directory's copy is missing, then the configuration file will be
- * copied to the appropriate location. If both restore directory contains a copy
- * that is different in content to configuration file, then an exception is
- * thrown at construction time.
- */
-public class FileBasedClusterNodeFirewall implements ClusterNodeFirewall {
-
- private final File config;
-
- private final File restoreDirectory;
-
- private final Collection<SubnetUtils.SubnetInfo> subnetInfos = new ArrayList<>();
-
- private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(FileBasedClusterNodeFirewall.class));
-
- public FileBasedClusterNodeFirewall(final File config) throws IOException {
- this(config, null);
- }
-
- public FileBasedClusterNodeFirewall(final File config, final File restoreDirectory) throws IOException {
-
- if (config == null) {
- throw new IllegalArgumentException("Firewall configuration file may not be null.");
- }
-
- this.config = config;
- this.restoreDirectory = restoreDirectory;
-
- if (restoreDirectory != null) {
- // synchronize with restore directory
- try {
- syncWithRestoreDirectory();
- } catch (final IOException ioe) {
- throw new RuntimeException(ioe);
- }
- }
-
- if (!config.exists() && !config.createNewFile()) {
- throw new IOException("Firewall configuration file did not exist and could not be created: " + config.getAbsolutePath());
- }
-
- logger.info("Loading cluster firewall configuration.");
- parseConfig(config);
- logger.info("Cluster firewall configuration loaded.");
- }
-
- @Override
- public boolean isPermissible(final String hostOrIp) {
- try {
-
- // if no rules, then permit everything
- if (subnetInfos.isEmpty()) {
- return true;
- }
-
- final String ip;
- try {
- ip = InetAddress.getByName(hostOrIp).getHostAddress();
- } catch (final UnknownHostException uhe) {
- logger.warn("Blocking unknown host: " + hostOrIp, uhe);
- return false;
- }
-
- // check each subnet to see if IP is in range
- for (final SubnetUtils.SubnetInfo subnetInfo : subnetInfos) {
- if (subnetInfo.isInRange(ip)) {
- return true;
- }
- }
-
- // no match
- return false;
-
- } catch (final IllegalArgumentException iae) {
- return false;
- }
- }
-
- private void syncWithRestoreDirectory() throws IOException {
-
- // sanity check that restore directory is a directory, creating it if necessary
- FileUtils.ensureDirectoryExistAndCanAccess(restoreDirectory);
-
- // check that restore directory is not the same as the primary directory
- if (config.getParentFile().getAbsolutePath().equals(restoreDirectory.getAbsolutePath())) {
- throw new IllegalStateException(
- String.format("Cluster firewall configuration file '%s' cannot be in the restore directory '%s' ",
- config.getAbsolutePath(), restoreDirectory.getAbsolutePath()));
- }
-
- // the restore copy will have same file name, but reside in a different directory
- final File restoreFile = new File(restoreDirectory, config.getName());
-
- // sync the primary copy with the restore copy
- FileUtils.syncWithRestore(config, restoreFile, logger);
-
- }
-
- private void parseConfig(final File config) throws IOException {
-
- // clear old information
- subnetInfos.clear();
- try (BufferedReader br = new BufferedReader(new FileReader(config))) {
-
- String ipOrHostLine;
- String ipCidr;
- int totalIpsAdded = 0;
- while ((ipOrHostLine = br.readLine()) != null) {
-
- // cleanup whitespace
- ipOrHostLine = ipOrHostLine.trim();
-
- if (ipOrHostLine.isEmpty() || ipOrHostLine.startsWith("#")) {
- // skip empty lines or comments
- continue;
- } else if (ipOrHostLine.contains("#")) {
- // parse out comments in IP containing lines
- ipOrHostLine = ipOrHostLine.substring(0, ipOrHostLine.indexOf("#")).trim();
- }
-
- // if given a complete IP, then covert to CIDR
- if (ipOrHostLine.contains("/")) {
- ipCidr = ipOrHostLine;
- } else if (ipOrHostLine.contains("\\")) {
- logger.warn("CIDR IP notation uses forward slashes '/'. Replacing backslash '\\' with forward slash'/' for '" + ipOrHostLine + "'");
- ipCidr = ipOrHostLine.replace("\\", "/");
- } else {
- try {
- ipCidr = InetAddress.getByName(ipOrHostLine).getHostAddress();
- if (!ipOrHostLine.equals(ipCidr)) {
- logger.debug(String.format("Resolved host '%s' to ip '%s'", ipOrHostLine, ipCidr));
- }
- ipCidr += "/32";
- logger.debug("Adding CIDR to exact IP: " + ipCidr);
- } catch (final UnknownHostException uhe) {
- logger.warn("Firewall is skipping unknown host address: " + ipOrHostLine);
- continue;
- }
- }
-
- try {
- logger.debug("Adding CIDR IP to firewall: " + ipCidr);
- final SubnetUtils subnetUtils = new SubnetUtils(ipCidr);
- subnetUtils.setInclusiveHostCount(true);
- subnetInfos.add(subnetUtils.getInfo());
- totalIpsAdded++;
- } catch (final IllegalArgumentException iae) {
- logger.warn("Firewall is skipping invalid CIDR address: " + ipOrHostLine);
- }
-
- }
-
- if (totalIpsAdded == 0) {
- logger.info("No IPs added to firewall. Firewall will accept all requests.");
- } else {
- logger.info(String.format("Added %d IP(s) to firewall. Only requests originating from the configured IPs will be accepted.", totalIpsAdded));
- }
-
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
deleted file mode 100644
index eedb88f..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.flow;
-
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.StandardDataFlow;
-
-/**
- * A dataflow with additional information about the cluster.
- *
- * @author unattributed
- */
-public class ClusterDataFlow {
-
- private final StandardDataFlow dataFlow;
-
- private final NodeIdentifier primaryNodeId;
-
- public ClusterDataFlow(final StandardDataFlow dataFlow, final NodeIdentifier primaryNodeId) {
- this.dataFlow = dataFlow;
- this.primaryNodeId = primaryNodeId;
- }
-
- public NodeIdentifier getPrimaryNodeId() {
- return primaryNodeId;
- }
-
- public StandardDataFlow getDataFlow() {
- return dataFlow;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DaoException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DaoException.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DaoException.java
deleted file mode 100644
index 6ff15a7..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DaoException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.flow;
-
-/**
- * A base exception for data access exceptions.
- *
- * @author unattributed
- */
-public class DaoException extends RuntimeException {
-
- public DaoException() {
- }
-
- public DaoException(String msg) {
- super(msg);
- }
-
- public DaoException(Throwable cause) {
- super(cause);
- }
-
- public DaoException(String msg, Throwable cause) {
- super(msg, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java
deleted file mode 100644
index a273704..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowDao.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.flow;
-
-/**
- * A data access object for loading and saving the flow managed by the cluster.
- *
- * @author unattributed
- */
-public interface DataFlowDao {
-
- /**
- * Loads the cluster's dataflow.
- *
- * @return the dataflow or null if no dataflow exists
- *
- * @throws DaoException if the dataflow was unable to be loaded
- */
- ClusterDataFlow loadDataFlow() throws DaoException;
-
- /**
- * Saves the cluster's dataflow.
- *
- *
- * @param dataFlow
- * @throws DaoException if the dataflow was unable to be saved
- */
- void saveDataFlow(ClusterDataFlow dataFlow) throws DaoException;
-
- /**
- * Sets the state of the dataflow. If the dataflow does not exist, then an
- * exception is thrown.
- *
- * @param flowState the state of the dataflow
- *
- * @throws DaoException if the state was unable to be updated
- */
- void setPersistedFlowState(PersistedFlowState flowState) throws DaoException;
-
- /**
- * Gets the state of the dataflow.
- *
- * @return the state of the dataflow
- *
- * @throws DaoException if the state was unable to be retrieved
- */
- PersistedFlowState getPersistedFlowState() throws DaoException;
-}