You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/01/21 14:04:25 UTC
[13/64] [partial] incubator-nifi git commit: NIFI-270 Made all
changes identified by adam, mark, joey to prep for a cleaner build
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java
deleted file mode 100644
index 35380dd..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpRequest.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.manager.testutils;
-
-import java.io.IOException;
-import java.io.Reader;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.core.MediaType;
-import org.apache.commons.lang3.StringUtils;
-
-/**
- * Encapsulates an HTTP request. The toString method returns the
- * specification-compliant request.
- *
- * @author unattributed
- */
-public class HttpRequest {
-
- private String method;
- private String uri;
- private String rawUri;
- private String version;
- private String body;
- private String rawRequest;
- private Map<String, String> headers = new HashMap<>();
- private Map<String, List<String>> parameters = new HashMap<>();
-
- public static HttpRequestBuilder createFromRequestLine(final String requestLine) {
- return new HttpRequestBuilder(requestLine);
- }
-
- public String getBody() {
- return body;
- }
-
- public Map<String, String> getHeaders() {
- return Collections.unmodifiableMap(headers);
- }
-
- public String getHeaderValue(final String header) {
- for (final Map.Entry<String, String> entry : getHeaders().entrySet()) {
- if (entry.getKey().equalsIgnoreCase(header)) {
- return entry.getValue();
- }
- }
- return null;
- }
-
- public String getMethod() {
- return method;
- }
-
- public Map<String, List<String>> getParameters() {
- final Map<String, List<String>> result = new HashMap<>();
- for (final Map.Entry<String, List<String>> entry : parameters.entrySet()) {
- result.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
- }
- return Collections.unmodifiableMap(result);
- }
-
- public String getUri() {
- return uri;
- }
-
- public String getRawUri() {
- return rawUri;
- }
-
- public String getVersion() {
- return version;
- }
-
- @Override
- public String toString() {
- return rawRequest;
- }
-
- /**
- * A builder for constructing basic HTTP requests. It handles only enough of
- * the HTTP specification to support basic unit testing, and it should not
- * be used otherwise.
- */
- public static class HttpRequestBuilder {
-
- private String method;
- private String uri;
- private String rawUri;
- private String version;
- private Map<String, String> headers = new HashMap<>();
- private Map<String, List<String>> parameters = new HashMap<>();
- private int contentLength = 0;
- private String contentType;
- private String body = "";
- private StringBuilder rawRequest = new StringBuilder();
-
- private HttpRequestBuilder(final String requestLine) {
-
- final String[] tokens = requestLine.split(" ");
- if (tokens.length != 3) {
- throw new IllegalArgumentException("Invalid HTTP Request Line: " + requestLine);
- }
-
- method = tokens[0];
- if (HttpMethod.GET.equalsIgnoreCase(method) || HttpMethod.HEAD.equalsIgnoreCase(method) || HttpMethod.DELETE.equalsIgnoreCase(method) || HttpMethod.OPTIONS.equalsIgnoreCase(method)) {
- final int queryIndex = tokens[1].indexOf("?");
- if (queryIndex > -1) {
- uri = tokens[1].substring(0, queryIndex);
- addParameters(tokens[1].substring(queryIndex + 1));
- } else {
- uri = tokens[1];
- }
- }
- rawUri = tokens[1];
- version = tokens[2];
- rawRequest.append(requestLine).append("\n");
- }
-
- private void addHeader(final String key, final String value) {
- if (key.contains(" ")) {
- throw new IllegalArgumentException("Header key may not contain spaces.");
- } else if ("content-length".equalsIgnoreCase(key)) {
- contentLength = (StringUtils.isBlank(value.trim())) ? 0 : Integer.parseInt(value.trim());
- } else if ("content-type".equalsIgnoreCase(key)) {
- contentType = value.trim();
- }
- headers.put(key, value);
- }
-
- public void addHeader(final String header) {
- final int firstColonIndex = header.indexOf(":");
- if (firstColonIndex < 0) {
- throw new IllegalArgumentException("Invalid HTTP Header line: " + header);
- }
- addHeader(header.substring(0, firstColonIndex), header.substring(firstColonIndex + 1));
- rawRequest.append(header).append("\n");
- }
-
- // final because constructor calls it
- public final void addParameters(final String queryString) {
-
- if (StringUtils.isBlank(queryString)) {
- return;
- }
-
- final String normQueryString;
- if (queryString.startsWith("?")) {
- normQueryString = queryString.substring(1);
- } else {
- normQueryString = queryString;
- }
- final String[] keyValuePairs = normQueryString.split("&");
- for (final String keyValuePair : keyValuePairs) {
- final String[] keyValueTokens = keyValuePair.split("=");
- try {
- addParameter(
- URLDecoder.decode(keyValueTokens[0], "utf-8"),
- URLDecoder.decode(keyValueTokens[1], "utf-8")
- );
- } catch (UnsupportedEncodingException use) {
- throw new RuntimeException(use);
- }
- }
- }
-
- public void addParameter(final String key, final String value) {
-
- if (key.contains(" ")) {
- throw new IllegalArgumentException("Parameter key may not contain spaces: " + key);
- }
-
- final List<String> values;
- if (parameters.containsKey(key)) {
- values = parameters.get(key);
- } else {
- values = new ArrayList<>();
- parameters.put(key, values);
- }
- values.add(value);
- }
-
- public void addBody(final Reader reader) throws IOException {
-
- if (contentLength <= 0) {
- return;
- }
-
- final char[] buf = new char[contentLength];
- int offset = 0;
- int numRead = 0;
- while (offset < buf.length && (numRead = reader.read(buf, offset, buf.length - offset)) >= 0) {
- offset += numRead;
- }
- body = new String(buf);
- rawRequest.append("\n");
- rawRequest.append(body);
- }
-
- public HttpRequest build() throws UnsupportedEncodingException {
-
- if (HttpMethod.GET.equalsIgnoreCase(method) == false && HttpMethod.HEAD.equalsIgnoreCase(method) == false && contentType.equalsIgnoreCase(MediaType.APPLICATION_FORM_URLENCODED)) {
- addParameters(body);
- }
-
- final HttpRequest request = new HttpRequest();
- request.method = this.method;
- request.uri = this.uri;
- request.rawUri = this.rawUri;
- request.version = this.version;
- request.headers.putAll(this.headers);
- request.parameters.putAll(this.parameters);
- request.body = this.body;
- request.rawRequest = this.rawRequest.toString();
-
- return request;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java
deleted file mode 100644
index 3aa2931..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponse.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.manager.testutils;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import javax.ws.rs.core.Response.Status;
-
-/**
- * Encapsulates an HTTP response. The toString method returns the
- * specification-compliant response.
- *
- * @author unattributed
- */
-public class HttpResponse {
-
- private final Status status;
- private final String entity;
- private final Map<String, String> headers = new HashMap<>();
-
- public HttpResponse(final Status status, final String entity) {
- this.status = status;
- this.entity = entity;
- headers.put("content-length", String.valueOf(entity.getBytes().length));
- }
-
- public String getEntity() {
- return entity;
- }
-
- public Status getStatus() {
- return status;
- }
-
- public Map<String, String> getHeaders() {
- return Collections.unmodifiableMap(headers);
- }
-
- public void addHeader(final String key, final String value) {
- if (key.contains(" ")) {
- throw new IllegalArgumentException("Header key may not contain spaces.");
- } else if ("content-length".equalsIgnoreCase(key)) {
- throw new IllegalArgumentException("Content-Length header is set automatically based on length of content.");
- }
- headers.put(key, value);
- }
-
- public void addHeaders(final Map<String, String> headers) {
- for (final Map.Entry<String, String> entry : headers.entrySet()) {
- addHeader(entry.getKey(), entry.getValue());
- }
- }
-
- @Override
- public String toString() {
-
- final StringBuilder strb = new StringBuilder();
-
- // response line
- strb.append("HTTP/1.1 ")
- .append(status.getStatusCode())
- .append(" ")
- .append(status.getReasonPhrase())
- .append("\n");
-
- // headers
- for (final Map.Entry<String, String> entry : headers.entrySet()) {
- strb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");
- }
-
- strb.append("\n");
-
- // body
- strb.append(entity);
-
- return strb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponseAction.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponseAction.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponseAction.java
deleted file mode 100644
index 28615d0..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpResponseAction.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.manager.testutils;
-
-/**
- * Wraps a HttpResponse with a time-delay. When the action is applied, the
- * currently executing thread sleeps for the given delay before returning the
- * response to the caller.
- *
- * This class is good for simulating network latency.
- *
- * @author unattributed
- */
-public class HttpResponseAction {
-
- private final HttpResponse response;
-
- private final int waitTimeMs;
-
- public HttpResponseAction(final HttpResponse response) {
- this(response, 0);
- }
-
- public HttpResponseAction(final HttpResponse response, final int waitTimeMs) {
- this.response = response;
- this.waitTimeMs = waitTimeMs;
- }
-
- public HttpResponse apply() {
- try {
- Thread.sleep(waitTimeMs);
- } catch (final InterruptedException ie) {
- throw new RuntimeException(ie);
- }
-
- return response;
- }
-
- public HttpResponse getResponse() {
- return response;
- }
-
- public int getWaitTimeMs() {
- return waitTimeMs;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java
deleted file mode 100644
index f17a66c..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/manager/testutils/HttpServer.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.manager.testutils;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-import java.io.Reader;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import org.apache.nifi.cluster.manager.testutils.HttpRequest.HttpRequestBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A simple HTTP web server that allows clients to register canned-responses to
- * respond to received requests.
- *
- * @author unattributed
- */
-public class HttpServer {
-
- private static final Logger logger = LoggerFactory.getLogger(HttpServer.class);
-
- private final ExecutorService executorService;
- private final ServerSocket serverSocket;
- private final Queue<HttpResponseAction> responseQueue = new ConcurrentLinkedQueue<>();
- private final Map<String, String> checkedHeaders = new HashMap<>();
- private final Map<String, List<String>> checkedParameters = new HashMap<>();
- private final int port;
-
- public HttpServer(int numThreads, int port) throws IOException {
- this.port = port;
- executorService = Executors.newFixedThreadPool(numThreads);
- serverSocket = new ServerSocket(port);
- }
-
- public void start() {
-
- new Thread() {
- @Override
- public void run() {
- while (isRunning()) {
- try {
- final Socket conn = serverSocket.accept();
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- handleRequest(conn);
- if (conn.isClosed() == false) {
- try {
- conn.close();
- } catch (IOException ioe) {
- }
- }
- }
- });
- } catch (final SocketException se) {
- /* ignored */
- } catch (final IOException ioe) {
- if (logger.isDebugEnabled()) {
- logger.warn("", ioe);
- }
- }
- }
- }
- ;
- }
-
- .start();
- }
-
- public boolean isRunning() {
- return executorService.isShutdown() == false;
- }
-
- public void stop() {
- // shutdown server socket
- try {
- if (serverSocket.isClosed() == false) {
- serverSocket.close();
- }
- } catch (final Exception ex) {
- throw new RuntimeException(ex);
- }
-
- // shutdown executor service
- try {
- executorService.shutdown();
- executorService.awaitTermination(3, TimeUnit.SECONDS);
- } catch (final Exception ex) {
- throw new RuntimeException(ex);
- }
- }
-
- public int getPort() {
- if (isRunning()) {
- return serverSocket.getLocalPort();
- } else {
- return port;
- }
- }
-
- public Queue<HttpResponseAction> addResponseAction(final HttpResponseAction response) {
- responseQueue.add(response);
- return responseQueue;
- }
-
- public void addCheckedHeaders(final Map<String, String> headers) {
- checkedHeaders.putAll(headers);
- }
-
- public void addCheckedParameters(final Map<String, List<String>> parameters) {
- checkedParameters.putAll(parameters);
- }
-
- private void handleRequest(final Socket conn) {
- try {
-
- final HttpRequest httpRequest = buildRequest(conn.getInputStream());
-
- if (logger.isDebugEnabled()) {
- logger.debug("\n" + httpRequest);
- }
-
- // check headers
- final Map<String, String> reqHeaders = httpRequest.getHeaders();
- for (final Map.Entry<String, String> entry : checkedHeaders.entrySet()) {
- if (reqHeaders.containsKey(entry.getKey())) {
- if (entry.getValue().equals(reqHeaders.get(entry.getKey()))) {
- logger.error("Incorrect HTTP request header value received for checked header: " + entry.getKey());
- conn.close();
- return;
- }
- } else {
- logger.error("Missing checked header: " + entry.getKey());
- conn.close();
- return;
- }
- }
-
- // check parameters
- final Map<String, List<String>> reqParams = httpRequest.getParameters();
- for (final Map.Entry<String, List<String>> entry : checkedParameters.entrySet()) {
- if (reqParams.containsKey(entry.getKey())) {
- if (entry.getValue().equals(reqParams.get(entry.getKey())) == false) {
- logger.error("Incorrect HTTP request parameter values received for checked parameter: " + entry.getKey());
- conn.close();
- return;
- }
- } else {
- logger.error("Missing checked parameter: " + entry.getKey());
- conn.close();
- return;
- }
- }
-
- // apply the next response
- final HttpResponseAction response = responseQueue.remove();
- response.apply();
-
- // send the response to client
- final PrintWriter pw = new PrintWriter(conn.getOutputStream(), true);
-
- if (logger.isDebugEnabled()) {
- logger.debug("\n" + response.getResponse());
- }
-
- pw.print(response.getResponse());
- pw.flush();
-
- } catch (IOException ioe) { /* ignored */ }
- }
-
- private HttpRequest buildRequest(final InputStream requestIs) throws IOException {
- return new HttpRequestReader().read(new InputStreamReader(requestIs));
- }
-
- // reads an HTTP request from the given reader
- private class HttpRequestReader {
-
- public HttpRequest read(final Reader reader) throws IOException {
-
- HttpRequestBuilder builder = null;
- String line = "";
- boolean isRequestLine = true;
- while ((line = readLine(reader)).isEmpty() == false) {
- if (isRequestLine) {
- builder = HttpRequest.createFromRequestLine(line);
- isRequestLine = false;
- } else {
- builder.addHeader(line);
- }
- }
-
- if (builder != null) {
- builder.addBody(reader);
- }
-
- return builder.build();
- }
-
- private String readLine(final Reader reader) throws IOException {
-
- /* read character at time to prevent blocking */
- final StringBuilder strb = new StringBuilder();
- char c;
- while ((c = (char) reader.read()) != '\n') {
- if (c != '\r') {
- strb.append(c);
- }
- }
- return strb.toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
deleted file mode 100644
index 96943c2..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
-import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
-import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
-import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
-import org.apache.nifi.cluster.protocol.message.PingMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.io.socket.ServerSocketConfiguration;
-import org.apache.nifi.io.socket.SocketConfiguration;
-import org.junit.After;
-import static org.junit.Assert.*;
-import org.junit.Before;
-import org.junit.Test;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/**
- * @author unattributed
- */
-public class ClusterManagerProtocolSenderImplTest {
-
- private InetAddress address;
-
- private int port;
-
- private SocketProtocolListener listener;
-
- private ClusterManagerProtocolSenderImpl sender;
-
- private ProtocolHandler mockHandler;
-
- @Before
- public void setup() throws IOException {
-
- address = InetAddress.getLocalHost();
- ServerSocketConfiguration serverSocketConfiguration = new ServerSocketConfiguration();
-
- mockHandler = mock(ProtocolHandler.class);
-
- ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
-
- listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, protocolContext);
- listener.addHandler(mockHandler);
- listener.start();
-
- port = listener.getPort();
-
- SocketConfiguration socketConfiguration = new SocketConfiguration();
- sender = new ClusterManagerProtocolSenderImpl(socketConfiguration, protocolContext);
- }
-
- @After
- public void teardown() throws IOException {
- if (listener.isRunning()) {
- listener.stop();
- }
- }
-
- @Test
- public void testRequestFlow() throws Exception {
-
- when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
- when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new FlowResponseMessage());
- FlowRequestMessage request = new FlowRequestMessage();
- request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port));
- FlowResponseMessage response = sender.requestFlow(request);
- assertNotNull(response);
- }
-
- @Test
- public void testRequestFlowWithBadResponseMessage() throws Exception {
-
- when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
- when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new PingMessage());
- FlowRequestMessage request = new FlowRequestMessage();
- request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port));
- try {
- sender.requestFlow(request);
- fail("failed to throw exception");
- } catch (ProtocolException pe) {
- }
-
- }
-
- @Test
- public void testRequestFlowDelayedResponse() throws Exception {
-
- final int time = 250;
- sender.getSocketConfiguration().setSocketTimeout(time);
-
- when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
- when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new Answer<FlowResponseMessage>() {
- @Override
- public FlowResponseMessage answer(InvocationOnMock invocation) throws Throwable {
- Thread.sleep(time * 3);
- return new FlowResponseMessage();
- }
- });
- FlowRequestMessage request = new FlowRequestMessage();
- request.setNodeId(new NodeIdentifier("id", "api-address", 1, address.getHostAddress(), port));
- try {
- sender.requestFlow(request);
- fail("failed to throw exception");
- } catch (ProtocolException pe) {
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java
deleted file mode 100644
index 4a69571..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
-import org.apache.nifi.io.socket.multicast.DiscoverableService;
-import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
-import static org.junit.Assert.*;
-import org.junit.Before;
-import org.junit.Test;
-import static org.mockito.Mockito.*;
-import org.mockito.stubbing.OngoingStubbing;
-
-public class ClusterServiceLocatorTest {
-
- private ClusterServiceDiscovery mockServiceDiscovery;
-
- private int fixedPort;
-
- private DiscoverableService fixedService;
-
- private ClusterServiceLocator serviceDiscoveryLocator;
-
- private ClusterServiceLocator serviceDiscoveryFixedPortLocator;
-
- private ClusterServiceLocator fixedServiceLocator;
-
- @Before
- public void setup() throws Exception {
-
- fixedPort = 1;
- mockServiceDiscovery = mock(ClusterServiceDiscovery.class);
- fixedService = new DiscoverableServiceImpl("some-service", InetSocketAddress.createUnresolved("some-host", 20));
-
- serviceDiscoveryLocator = new ClusterServiceLocator(mockServiceDiscovery);
- serviceDiscoveryFixedPortLocator = new ClusterServiceLocator(mockServiceDiscovery, fixedPort);
- fixedServiceLocator = new ClusterServiceLocator(fixedService);
-
- }
-
- @Test
- public void getServiceWhenServiceDiscoveryNotStarted() {
- assertNull(serviceDiscoveryLocator.getService());
- }
-
- @Test
- public void getServiceWhenServiceDiscoveryFixedPortNotStarted() {
- assertNull(serviceDiscoveryLocator.getService());
- }
-
- @Test
- public void getServiceWhenFixedServiceNotStarted() {
- assertEquals(fixedService, fixedServiceLocator.getService());
- }
-
- @Test
- public void getServiceNotOnFirstAttempt() {
-
- ClusterServiceLocator.AttemptsConfig config = new ClusterServiceLocator.AttemptsConfig();
- config.setNumAttempts(2);
- config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS);
- config.setTimeBetweenAttempts(1);
-
- serviceDiscoveryLocator.setAttemptsConfig(config);
-
- OngoingStubbing<DiscoverableService> stubbing = null;
- for (int i = 0; i < config.getNumAttempts() - 1; i++) {
- if (stubbing == null) {
- stubbing = when(mockServiceDiscovery.getService()).thenReturn(null);
- } else {
- stubbing.thenReturn(null);
- }
- }
- stubbing.thenReturn(fixedService);
-
- assertEquals(fixedService, serviceDiscoveryLocator.getService());
-
- }
-
- @Test
- public void getServiceNotOnFirstAttemptWithFixedPort() {
-
- ClusterServiceLocator.AttemptsConfig config = new ClusterServiceLocator.AttemptsConfig();
- config.setNumAttempts(2);
- config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS);
- config.setTimeBetweenAttempts(1);
-
- serviceDiscoveryFixedPortLocator.setAttemptsConfig(config);
-
- OngoingStubbing<DiscoverableService> stubbing = null;
- for (int i = 0; i < config.getNumAttempts() - 1; i++) {
- if (stubbing == null) {
- stubbing = when(mockServiceDiscovery.getService()).thenReturn(null);
- } else {
- stubbing.thenReturn(null);
- }
- }
- stubbing.thenReturn(fixedService);
-
- InetSocketAddress resultAddress = InetSocketAddress.createUnresolved(fixedService.getServiceAddress().getHostName(), fixedPort);
- DiscoverableService resultService = new DiscoverableServiceImpl(fixedService.getServiceName(), resultAddress);
- assertEquals(resultService, serviceDiscoveryFixedPortLocator.getService());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
deleted file mode 100644
index 4d85d1a..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.net.InetSocketAddress;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
-import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage;
-import org.apache.nifi.io.socket.multicast.DiscoverableService;
-import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
-import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
-import org.junit.After;
-import static org.junit.Assert.*;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-/**
- * @author unattributed
- */
-public class ClusterServicesBroadcasterTest {
-
- private ClusterServicesBroadcaster broadcaster;
-
- private MulticastProtocolListener listener;
-
- private DummyProtocolHandler handler;
-
- private InetSocketAddress multicastAddress;
-
- private DiscoverableService broadcastedService;
-
- private ProtocolContext protocolContext;
-
- private MulticastConfiguration configuration;
-
- @Before
- public void setup() throws Exception {
-
- broadcastedService = new DiscoverableServiceImpl("some-service", new InetSocketAddress("localhost", 11111));
-
- multicastAddress = new InetSocketAddress("225.1.1.1", 22222);
-
- configuration = new MulticastConfiguration();
-
- protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
-
- broadcaster = new ClusterServicesBroadcaster(multicastAddress, configuration, protocolContext, "500 ms");
- broadcaster.addService(broadcastedService);
-
- handler = new DummyProtocolHandler();
- listener = new MulticastProtocolListener(5, multicastAddress, configuration, protocolContext);
- listener.addHandler(handler);
- }
-
- @After
- public void teardown() {
-
- if (broadcaster.isRunning()) {
- broadcaster.stop();
- }
-
- try {
- if (listener.isRunning()) {
- listener.stop();
- }
- } catch (Exception ex) {
- ex.printStackTrace(System.out);
- }
-
- }
-
- @Test
- @Ignore
- public void testBroadcastReceived() throws Exception {
-
- broadcaster.start();
- listener.start();
-
- Thread.sleep(1000);
-
- listener.stop();
-
- assertNotNull(handler.getProtocolMessage());
- assertEquals(ProtocolMessage.MessageType.SERVICE_BROADCAST, handler.getProtocolMessage().getType());
- final ServiceBroadcastMessage msg = (ServiceBroadcastMessage) handler.getProtocolMessage();
- assertEquals(broadcastedService.getServiceName(), msg.getServiceName());
- assertEquals(broadcastedService.getServiceAddress().getHostName(), msg.getAddress());
- assertEquals(broadcastedService.getServiceAddress().getPort(), msg.getPort());
- }
-
- private class DummyProtocolHandler implements ProtocolHandler {
-
- private ProtocolMessage protocolMessage;
-
- @Override
- public boolean canHandle(ProtocolMessage msg) {
- return true;
- }
-
- @Override
- public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
- this.protocolMessage = msg;
- return null;
- }
-
- public ProtocolMessage getProtocolMessage() {
- return protocolMessage;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
deleted file mode 100644
index 6c79b90..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.InetSocketAddress;
-import java.net.MulticastSocket;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
-import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
-import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
-import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
-import org.apache.nifi.cluster.protocol.message.PingMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
-import org.apache.nifi.io.socket.multicast.MulticastUtils;
-import org.junit.After;
-import static org.junit.Assert.*;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-/**
- * @author unattributed
- */
-public class MulticastProtocolListenerTest {
-
- private MulticastProtocolListener listener;
-
- private MulticastSocket socket;
-
- private InetSocketAddress address;
-
- private MulticastConfiguration configuration;
-
- private ProtocolContext protocolContext;
-
- @Before
- public void setup() throws Exception {
-
- address = new InetSocketAddress("226.1.1.1", 60000);
- configuration = new MulticastConfiguration();
-
- protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
-
- listener = new MulticastProtocolListener(5, address, configuration, protocolContext);
- listener.start();
-
- socket = MulticastUtils.createMulticastSocket(address.getPort(), configuration);
- }
-
- @After
- public void teardown() throws IOException {
- try {
- if (listener.isRunning()) {
- listener.stop();
- }
- } finally {
- MulticastUtils.closeQuietly(socket);
- }
- }
-
- @Ignore("This test must be reworked. Requires an active network connection")
- @Test
- public void testBadRequest() throws Exception {
- DelayedProtocolHandler handler = new DelayedProtocolHandler(0);
- listener.addHandler(handler);
- DatagramPacket packet = new DatagramPacket(new byte[]{5}, 1, address);
- socket.send(packet);
- Thread.sleep(250);
- assertEquals(0, handler.getMessages().size());
- }
-
- @Test
- @Ignore
- public void testRequest() throws Exception {
-
- ReflexiveProtocolHandler handler = new ReflexiveProtocolHandler();
- listener.addHandler(handler);
-
- ProtocolMessage msg = new PingMessage();
- MulticastProtocolMessage multicastMsg = new MulticastProtocolMessage("some-id", msg);
-
- // marshal message to output stream
- ProtocolMessageMarshaller marshaller = protocolContext.createMarshaller();
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- marshaller.marshal(multicastMsg, baos);
- byte[] requestPacketBytes = baos.toByteArray();
- DatagramPacket packet = new DatagramPacket(requestPacketBytes, requestPacketBytes.length, address);
- socket.send(packet);
-
- Thread.sleep(250);
- assertEquals(1, handler.getMessages().size());
- assertEquals(msg.getType(), handler.getMessages().get(0).getType());
-
- }
-
- private class ReflexiveProtocolHandler implements ProtocolHandler {
-
- private List<ProtocolMessage> messages = new ArrayList<>();
-
- @Override
- public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
- messages.add(msg);
- return msg;
- }
-
- @Override
- public boolean canHandle(ProtocolMessage msg) {
- return true;
- }
-
- public List<ProtocolMessage> getMessages() {
- return messages;
- }
-
- }
-
- private class DelayedProtocolHandler implements ProtocolHandler {
-
- private int delay = 0;
-
- private List<ProtocolMessage> messages = new ArrayList<>();
-
- public DelayedProtocolHandler(int delay) {
- this.delay = delay;
- }
-
- @Override
- public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
- try {
- messages.add(msg);
- Thread.sleep(delay);
- return null;
- } catch (final InterruptedException ie) {
- throw new ProtocolException(ie);
- }
-
- }
-
- @Override
- public boolean canHandle(ProtocolMessage msg) {
- return true;
- }
-
- public List<ProtocolMessage> getMessages() {
- return messages;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
deleted file mode 100644
index 7c62d2f..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.UUID;
-
-import org.apache.nifi.cluster.HeartbeatPayload;
-import org.apache.nifi.cluster.protocol.ConnectionRequest;
-import org.apache.nifi.cluster.protocol.ConnectionResponse;
-import org.apache.nifi.cluster.protocol.Heartbeat;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.StandardDataFlow;
-import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
-import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
-import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
-import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
-import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
-import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
-import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.PingMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.io.socket.ServerSocketConfiguration;
-import org.apache.nifi.io.socket.SocketConfiguration;
-import org.apache.nifi.io.socket.multicast.DiscoverableService;
-import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/**
- * @author unattributed
- */
-public class NodeProtocolSenderImplTest {
-
- private SocketProtocolListener listener;
-
- private NodeProtocolSenderImpl sender;
-
- private DiscoverableService service;
-
- private ServerSocketConfiguration serverSocketConfiguration;
-
- private ClusterServiceLocator mockServiceLocator;
-
- private ProtocolHandler mockHandler;
-
- private NodeIdentifier nodeIdentifier;
-
- @Before
- public void setup() throws IOException {
-
- serverSocketConfiguration = new ServerSocketConfiguration();
-
- mockServiceLocator = mock(ClusterServiceLocator.class);
- mockHandler = mock(ProtocolHandler.class);
-
- nodeIdentifier = new NodeIdentifier("1", "localhost", 1234, "localhost", 5678);
-
- ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
-
- listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, protocolContext);
- listener.setShutdownListenerSeconds(3);
- listener.addHandler(mockHandler);
- listener.start();
-
- service = new DiscoverableServiceImpl("some-service", new InetSocketAddress("localhost", listener.getPort()));
-
- SocketConfiguration socketConfiguration = new SocketConfiguration();
- socketConfiguration.setReuseAddress(true);
- sender = new NodeProtocolSenderImpl(mockServiceLocator, socketConfiguration, protocolContext);
- }
-
- @After
- public void teardown() throws IOException {
- if (listener.isRunning()) {
- listener.stop();
- }
- }
-
- @Test
- public void testConnect() throws Exception {
-
- when(mockServiceLocator.getService()).thenReturn(service);
- when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
- ConnectionResponseMessage mockMessage = new ConnectionResponseMessage();
- mockMessage.setConnectionResponse(new ConnectionResponse(nodeIdentifier, new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], new byte[0]), false, null, null, UUID.randomUUID().toString()));
- when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(mockMessage);
-
- ConnectionRequestMessage request = new ConnectionRequestMessage();
- request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
- ConnectionResponseMessage response = sender.requestConnection(request);
- assertNotNull(response);
- }
-
- @Test(expected = UnknownServiceAddressException.class)
- public void testConnectNoClusterManagerAddress() throws Exception {
-
- when(mockServiceLocator.getService()).thenReturn(null);
- when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
- when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new ConnectionResponseMessage());
-
- ConnectionRequestMessage request = new ConnectionRequestMessage();
- request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
-
- sender.requestConnection(request);
- fail("failed to throw exception");
- }
-
- @Test(expected = ProtocolException.class)
- public void testConnectBadResponse() throws Exception {
-
- when(mockServiceLocator.getService()).thenReturn(service);
- when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
- when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new PingMessage());
-
- ConnectionRequestMessage request = new ConnectionRequestMessage();
- request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
-
- sender.requestConnection(request);
- fail("failed to throw exception");
-
- }
-
- @Test(expected = ProtocolException.class)
- public void testConnectDelayedResponse() throws Exception {
-
- final int time = 250;
- sender.getSocketConfiguration().setSocketTimeout(time);
- when(mockServiceLocator.getService()).thenReturn(service);
- when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
- when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new Answer<ConnectionResponseMessage>() {
- @Override
- public ConnectionResponseMessage answer(InvocationOnMock invocation) throws Throwable {
- Thread.sleep(time * 3);
- return new ConnectionResponseMessage();
- }
- });
- ConnectionRequestMessage request = new ConnectionRequestMessage();
- request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
-
- sender.requestConnection(request);
- fail("failed to throw exception");
-
- }
-
- @Test
- public void testHeartbeat() throws Exception {
-
- when(mockServiceLocator.getService()).thenReturn(service);
- when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
- when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null);
-
- HeartbeatMessage msg = new HeartbeatMessage();
- HeartbeatPayload hbPayload = new HeartbeatPayload();
- Heartbeat hb = new Heartbeat(new NodeIdentifier("id", "localhost", 3, "localhost", 4), false, false, hbPayload.marshal());
- msg.setHeartbeat(hb);
- sender.heartbeat(msg);
- }
-
- @Test
- public void testNotifyControllerStartupFailure() throws Exception {
-
- when(mockServiceLocator.getService()).thenReturn(service);
- when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
- when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null);
-
- ControllerStartupFailureMessage msg = new ControllerStartupFailureMessage();
- msg.setNodeId(new NodeIdentifier("some-id", "some-addr", 1, "some-addr", 1));
- msg.setExceptionMessage("some exception");
- sender.notifyControllerStartupFailure(msg);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java
deleted file mode 100644
index 92a7d2a..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListenerTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
-import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
-import org.apache.nifi.cluster.protocol.jaxb.JaxbProtocolContext;
-import org.apache.nifi.cluster.protocol.jaxb.message.JaxbProtocolUtils;
-import org.apache.nifi.cluster.protocol.message.PingMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.cluster.protocol.testutils.DelayedProtocolHandler;
-import org.apache.nifi.cluster.protocol.testutils.ReflexiveProtocolHandler;
-import org.apache.nifi.io.socket.ServerSocketConfiguration;
-import org.apache.nifi.io.socket.SocketConfiguration;
-import org.apache.nifi.io.socket.SocketUtils;
-import org.junit.After;
-import static org.junit.Assert.*;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * @author unattributed
- */
-public class SocketProtocolListenerTest {
-
- private SocketProtocolListener listener;
-
- private Socket socket;
-
- private ProtocolMessageMarshaller<ProtocolMessage> marshaller;
-
- private ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller;
-
- @Before
- public void setup() throws Exception {
-
- final ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
- marshaller = protocolContext.createMarshaller();
- unmarshaller = protocolContext.createUnmarshaller();
-
- ServerSocketConfiguration configuration = new ServerSocketConfiguration();
- configuration.setSocketTimeout(1000);
-
- listener = new SocketProtocolListener(5, 0, configuration, protocolContext);
- listener.start();
-
- int port = listener.getPort();
-
- SocketConfiguration config = new SocketConfiguration();
- config.setReuseAddress(true);
- config.setSocketTimeout(1000);
- socket = SocketUtils.createSocket(new InetSocketAddress("localhost", port), config);
- }
-
- @After
- public void teardown() throws IOException {
- try {
- if (listener.isRunning()) {
- listener.stop();
- }
- } finally {
- SocketUtils.closeQuietly(socket);
- }
- }
-
- @Test
- public void testBadRequest() throws Exception {
- DelayedProtocolHandler handler = new DelayedProtocolHandler(0);
- listener.addHandler(handler);
- socket.getOutputStream().write(5);
- Thread.sleep(250);
- assertEquals(0, handler.getMessages().size());
- }
-
- @Test
- public void testRequest() throws Exception {
- ProtocolMessage msg = new PingMessage();
-
- ReflexiveProtocolHandler handler = new ReflexiveProtocolHandler();
- listener.addHandler(handler);
-
- // marshal message to output stream
- marshaller.marshal(msg, socket.getOutputStream());
-
- // unmarshall response and return
- ProtocolMessage response = unmarshaller.unmarshal(socket.getInputStream());
- assertEquals(msg.getType(), response.getType());
-
- assertEquals(1, handler.getMessages().size());
- assertEquals(msg.getType(), handler.getMessages().get(0).getType());
- }
-
- @Test
- public void testDelayedRequest() throws Exception {
- ProtocolMessage msg = new PingMessage();
-
- DelayedProtocolHandler handler = new DelayedProtocolHandler(2000);
- listener.addHandler(handler);
-
- // marshal message to output stream
- marshaller.marshal(msg, socket.getOutputStream());
-
- try {
- socket.getInputStream().read();
- fail("Socket timeout not received.");
- } catch (SocketTimeoutException ste) {
- }
-
- assertEquals(1, handler.getMessages().size());
- assertEquals(msg.getType(), handler.getMessages().get(0).getType());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java
deleted file mode 100644
index 2f16777..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/DelayedProtocolHandler.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.testutils;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-
-/**
- * @author unattributed
- */
-public class DelayedProtocolHandler implements ProtocolHandler {
-
- private int delay = 0;
- private List<ProtocolMessage> messages = new ArrayList<>();
-
- public DelayedProtocolHandler(int delay) {
- this.delay = delay;
- }
-
- @Override
- public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
- try {
- messages.add(msg);
- Thread.sleep(delay);
- return null;
- } catch (final InterruptedException ie) {
- throw new ProtocolException(ie);
- }
-
- }
-
- @Override
- public boolean canHandle(ProtocolMessage msg) {
- return true;
- }
-
- public List<ProtocolMessage> getMessages() {
- return messages;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java
deleted file mode 100644
index e80f52c..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/java/org/apache/nifi/cluster/protocol/testutils/ReflexiveProtocolHandler.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.testutils;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-
-/**
- * @author unattributed
- */
-public class ReflexiveProtocolHandler implements ProtocolHandler {
-
- private List<ProtocolMessage> messages = new ArrayList<>();
-
- @Override
- public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
- messages.add(msg);
- return msg;
- }
-
- @Override
- public boolean canHandle(ProtocolMessage msg) {
- return true;
- }
-
- public List<ProtocolMessage> getMessages() {
- return messages;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/resources/logback-test.xml b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/resources/logback-test.xml
deleted file mode 100644
index 92eb78c..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,48 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<configuration scan="true" scanPeriod="30 seconds">
- <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
- <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
- <pattern>%-4r [%t] %-5p %c - %m%n</pattern>
- </encoder>
- </appender>
-
- <!-- valid logging levels: TRACE, DEBUG, INFO, WARN, ERROR -->
- <logger name="org.apache.nifi" level="INFO"/>
-
- <!-- Logger for managing logging statements for nifi clusters. -->
- <logger name="org.apache.nifi.cluster" level="INFO"/>
-
- <!--
- Logger for logging HTTP requests received by the web server. Setting
- log level to 'debug' activates HTTP request logging.
- -->
- <logger name="org.apache.nifi.server.JettyServer" level="INFO"/>
-
- <!-- Logger for managing logging statements for jetty -->
- <logger name="org.mortbay" level="INFO"/>
-
- <!-- Suppress non-error messages due to excessive logging by class -->
- <logger name="com.sun.jersey.spi.container.servlet.WebComponent" level="ERROR"/>
-
- <logger name="org.apache.nifi.processors.standard" level="DEBUG"/>
-
- <root level="INFO">
- <appender-ref ref="CONSOLE"/>
- </root>
-
-</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/empty.txt
deleted file mode 100755
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt b/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt
deleted file mode 100755
index e8e4c2b..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/test/resources/org/apache/nifi/cluster/firewall/impl/ips.txt
+++ /dev/null
@@ -1,12 +0,0 @@
-
-bad data should be skipped
-
-# this is a comment
- 2.2.2.2 # this is another comment ####
-3.3.3.3/8
-
-4.4.4.4/24
-
-5.5.5.255/31
-
-more bad data
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/.gitignore
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/.gitignore b/nifi/nar-bundles/framework-bundle/framework/core-api/.gitignore
deleted file mode 100755
index ea8c4bf..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/target
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/pom.xml b/nifi/nar-bundles/framework-bundle/framework/core-api/pom.xml
deleted file mode 100644
index f8d8e13..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/pom.xml
+++ /dev/null
@@ -1,56 +0,0 @@
-<?xml version="1.0"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-framework-parent</artifactId>
- <version>0.0.1-incubating-SNAPSHOT</version>
- </parent>
- <artifactId>core-api</artifactId>
- <version>0.0.1-incubating-SNAPSHOT</version>
- <name>NiFi Core API</name>
- <dependencies>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-nar</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-utils</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-runtime</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>client-dto</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </dependency>
- <dependency>
- <groupId>org.quartz-scheduler</groupId>
- <artifactId>quartz</artifactId>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java
deleted file mode 100644
index 0092f7a..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster;
-
-public class AdaptedNodeInformation {
-
- private String hostname;
- private Integer siteToSitePort;
- private int apiPort;
- private boolean isSiteToSiteSecure;
- private int totalFlowFiles;
-
- public String getHostname() {
- return hostname;
- }
-
- public void setHostname(String hostname) {
- this.hostname = hostname;
- }
-
- public Integer getSiteToSitePort() {
- return siteToSitePort;
- }
-
- public void setSiteToSitePort(Integer siteToSitePort) {
- this.siteToSitePort = siteToSitePort;
- }
-
- public int getApiPort() {
- return apiPort;
- }
-
- public void setApiPort(int apiPort) {
- this.apiPort = apiPort;
- }
-
- public boolean isSiteToSiteSecure() {
- return isSiteToSiteSecure;
- }
-
- public void setSiteToSiteSecure(boolean isSiteToSiteSecure) {
- this.isSiteToSiteSecure = isSiteToSiteSecure;
- }
-
- public int getTotalFlowFiles() {
- return totalFlowFiles;
- }
-
- public void setTotalFlowFiles(int totalFlowFiles) {
- this.totalFlowFiles = totalFlowFiles;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java
deleted file mode 100644
index 5751c32..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collection;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.bind.annotation.XmlRootElement;
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-
-@XmlRootElement
-public class ClusterNodeInformation {
-
- private Collection<NodeInformation> nodeInfo;
-
- private static final JAXBContext JAXB_CONTEXT;
-
- static {
- try {
- JAXB_CONTEXT = JAXBContext.newInstance(ClusterNodeInformation.class);
- } catch (JAXBException e) {
- throw new RuntimeException("Unable to create JAXBContext.", e);
- }
- }
-
- public ClusterNodeInformation() {
- this.nodeInfo = null;
- }
-
- public void setNodeInformation(final Collection<NodeInformation> nodeInfo) {
- this.nodeInfo = nodeInfo;
- }
-
- @XmlJavaTypeAdapter(NodeInformationAdapter.class)
- public Collection<NodeInformation> getNodeInformation() {
- return nodeInfo;
- }
-
- public void marshal(final OutputStream os) throws JAXBException {
- final Marshaller marshaller = JAXB_CONTEXT.createMarshaller();
- marshaller.marshal(this, os);
- }
-
- public static ClusterNodeInformation unmarshal(final InputStream is) throws JAXBException {
- final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
- return (ClusterNodeInformation) unmarshaller.unmarshal(is);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java
deleted file mode 100644
index 987ff65..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster;
-
-public interface NodeInformant {
-
- ClusterNodeInformation getNodeInformation();
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java
deleted file mode 100644
index 848eb7e..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster;
-
-public class NodeInformation {
-
- private final String hostname;
- private final Integer siteToSitePort;
- private final int apiPort;
- private final boolean isSiteToSiteSecure;
- private final int totalFlowFiles;
-
- public NodeInformation(final String hostname, final Integer siteToSitePort, final int apiPort,
- final boolean isSiteToSiteSecure, final int totalFlowFiles) {
- this.hostname = hostname;
- this.siteToSitePort = siteToSitePort;
- this.apiPort = apiPort;
- this.isSiteToSiteSecure = isSiteToSiteSecure;
- this.totalFlowFiles = totalFlowFiles;
- }
-
- public String getHostname() {
- return hostname;
- }
-
- public int getAPIPort() {
- return apiPort;
- }
-
- public Integer getSiteToSitePort() {
- return siteToSitePort;
- }
-
- public boolean isSiteToSiteSecure() {
- return isSiteToSiteSecure;
- }
-
- public int getTotalFlowFiles() {
- return totalFlowFiles;
- }
-
- @Override
- public boolean equals(final Object obj) {
- if (obj == this) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (!(obj instanceof NodeInformation)) {
- return false;
- }
-
- final NodeInformation other = (NodeInformation) obj;
- if (!hostname.equals(other.hostname)) {
- return false;
- }
- if (siteToSitePort == null && other.siteToSitePort != null) {
- return false;
- }
- if (siteToSitePort != null && other.siteToSitePort == null) {
- return false;
- } else if (siteToSitePort != null && siteToSitePort.intValue() != other.siteToSitePort.intValue()) {
- return false;
- }
- if (apiPort != other.apiPort) {
- return false;
- }
- if (isSiteToSiteSecure != other.isSiteToSiteSecure) {
- return false;
- }
- return true;
- }
-
- @Override
- public int hashCode() {
- return 83832 + hostname.hashCode() + (siteToSitePort == null ? 8 : siteToSitePort.hashCode()) + apiPort + (isSiteToSiteSecure ? 3829 : 0);
- }
-
- @Override
- public String toString() {
- return "Node[" + hostname + ":" + apiPort + "]";
- }
-}