You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/02 21:30:35 UTC
[10/10] nifi git commit: merge of master and includes tcp and udp
socket changes - requires rebase before merging into master
merge of master and includes tcp and udp socket changes - requires rebase before merging into master
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3d759fec
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3d759fec
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3d759fec
Branch: refs/heads/NIFI-274
Commit: 3d759feca529d09f940ffada511a88d689c33d97
Parents: 7f58b2a 664bda8
Author: Tony Kurc <tr...@gmail.com>
Authored: Mon Nov 2 15:28:26 2015 -0500
Committer: Tony Kurc <tr...@gmail.com>
Committed: Mon Nov 2 15:28:47 2015 -0500
----------------------------------------------------------------------
.../java/org/apache/nifi/action/Action.java | 50 ++
.../java/org/apache/nifi/action/Component.java | 34 +
.../java/org/apache/nifi/action/Operation.java | 37 +
.../component/details/ComponentDetails.java | 26 +
.../component/details/ExtensionDetails.java | 26 +
.../details/RemoteProcessGroupDetails.java | 26 +
.../nifi/action/details/ActionDetails.java | 26 +
.../nifi/action/details/ConfigureDetails.java | 30 +
.../nifi/action/details/ConnectDetails.java | 40 +
.../apache/nifi/action/details/MoveDetails.java | 30 +
.../nifi/action/details/PurgeDetails.java | 28 +
.../apache/nifi/processor/ProcessSession.java | 27 +
.../nifi/provenance/ProvenanceEventType.java | 31 +-
.../nifi/provenance/ProvenanceReporter.java | 37 +
.../org/apache/nifi/reporting/EventAccess.java | 13 +
.../client/socket/EndpointConnectionPool.java | 3 +-
.../src/main/asciidoc/getting-started.adoc | 754 +++++++++++++++++
.../src/main/asciidoc/images/add-processor.png | Bin 31524 -> 92164 bytes
nifi-docs/src/main/asciidoc/user-guide.adoc | 2 +-
.../org/apache/nifi/util/MockEventAccess.java | 27 +
.../apache/nifi/util/MockProcessSession.java | 8 +
.../nifi/util/MockProvenanceReporter.java | 35 +-
.../nifi/processors/aws/s3/FetchS3Object.java | 2 +-
.../org/apache/nifi/admin/dao/ActionDAO.java | 12 +-
.../java/org/apache/nifi/admin/dao/UserDAO.java | 3 +-
.../nifi/admin/dao/impl/StandardActionDAO.java | 93 ++-
.../nifi/admin/dao/impl/StandardUserDAO.java | 4 +-
.../apache/nifi/admin/service/AuditService.java | 18 +-
.../admin/service/action/AddActionsAction.java | 3 +-
.../service/action/PurgeActionsAction.java | 3 +-
.../service/impl/StandardAuditService.java | 24 +-
.../resources/nifi-administration-context.xml | 2 +-
.../manager/impl/ClusteredEventAccess.java | 21 +-
.../cluster/manager/impl/WebClusterManager.java | 2 +-
.../apache/nifi/controller/FlowController.java | 158 ++--
.../repository/BatchingSessionFactory.java | 5 +
.../repository/FileSystemRepository.java | 10 +-
.../repository/StandardProcessSession.java | 18 +-
.../repository/StandardProvenanceReporter.java | 31 +-
.../nifi/processor/SimpleProcessLogger.java | 20 +-
.../nifi/spring/FlowControllerFactoryBean.java | 8 +
.../src/main/resources/nifi-context.xml | 1 +
.../controller/StandardFlowServiceTest.java | 5 +-
.../repository/TestStandardProcessSession.java | 39 +-
.../nifi/processor/TestSimpleProcessLogger.java | 101 +++
.../nifi-framework/nifi-user-actions/pom.xml | 7 +
.../java/org/apache/nifi/action/Action.java | 121 ---
.../java/org/apache/nifi/action/Component.java | 34 -
.../apache/nifi/action/FlowChangeAction.java | 130 +++
.../java/org/apache/nifi/action/Operation.java | 37 -
.../component/details/ComponentDetails.java | 26 -
.../component/details/ExtensionDetails.java | 34 -
.../details/FlowChangeExtensionDetails.java | 35 +
.../FlowChangeRemoteProcessGroupDetails.java | 35 +
.../details/RemoteProcessGroupDetails.java | 34 -
.../nifi/action/details/ActionDetails.java | 26 -
.../nifi/action/details/ConfigureDetails.java | 52 --
.../nifi/action/details/ConnectDetails.java | 90 --
.../details/FlowChangeConfigureDetails.java | 55 ++
.../details/FlowChangeConnectDetails.java | 97 +++
.../action/details/FlowChangeMoveDetails.java | 65 ++
.../action/details/FlowChangePurgeDetails.java | 46 +
.../apache/nifi/action/details/MoveDetails.java | 61 --
.../nifi/action/details/PurgeDetails.java | 45 -
.../apache/nifi/audit/ControllerAuditor.java | 27 +-
.../nifi/audit/ControllerServiceAuditor.java | 43 +-
.../org/apache/nifi/audit/FunnelAuditor.java | 7 +-
.../java/org/apache/nifi/audit/NiFiAuditor.java | 25 +-
.../java/org/apache/nifi/audit/PortAuditor.java | 27 +-
.../apache/nifi/audit/ProcessGroupAuditor.java | 25 +-
.../org/apache/nifi/audit/ProcessorAuditor.java | 25 +-
.../apache/nifi/audit/RelationshipAuditor.java | 18 +-
.../nifi/audit/RemoteProcessGroupAuditor.java | 35 +-
.../apache/nifi/audit/ReportingTaskAuditor.java | 25 +-
.../org/apache/nifi/audit/SnippetAuditor.java | 26 +-
.../nifi/web/StandardNiFiServiceFacade.java | 9 +-
.../StandardNiFiWebConfigurationContext.java | 13 +-
.../apache/nifi/web/StandardNiFiWebContext.java | 13 +-
.../nifi/web/api/ApplicationResource.java | 5 +-
.../org/apache/nifi/web/api/dto/DtoFactory.java | 20 +-
.../nifi/processors/hadoop/FetchHDFS.java | 2 +-
.../apache/nifi/processors/kafka/PutKafka.java | 6 +-
.../nifi-pcap-processors/.gitignore | 1 -
.../nifi-standard-processors/pom.xml | 5 +-
.../standard/AbstractSyslogProcessor.java | 5 +-
.../processors/standard/AttributesToJSON.java | 242 ++++++
.../processors/standard/FetchFileTransfer.java | 3 +-
.../nifi/processors/standard/InvokeHTTP.java | 198 +++--
.../nifi/processors/standard/ListenSyslog.java | 32 +-
.../nifi/processors/standard/MergeContent.java | 6 +-
.../apache/nifi/processors/standard/PutSQL.java | 156 ++--
.../nifi/processors/standard/ReplaceText.java | 431 +++++++---
.../processors/standard/util/SyslogEvent.java | 13 +
.../processors/standard/util/SyslogParser.java | 18 +-
.../org.apache.nifi.processor.Processor | 1 +
.../standard/TestAttributesToJSON.java | 282 +++++++
.../processors/standard/TestInvokeHTTP.java | 607 +-------------
.../processors/standard/TestInvokeHttpSSL.java | 90 ++
.../nifi/processors/standard/TestPutSQL.java | 46 +-
.../processors/standard/TestReplaceText.java | 635 +++++++++++++-
.../standard/TestReplaceTextLineByLine.java | 336 --------
.../standard/util/TestInvokeHttpCommon.java | 830 +++++++++++++++++++
.../standard/util/TestSyslogParser.java | 15 +
.../AppendLineByLineTest.txt | 11 +
.../PrependLineByLineTest.txt | 11 +
105 files changed, 5092 insertions(+), 2136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/3d759fec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/3d759fec/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --cc nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 457ec5d,9f57c9f..3105b1f
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@@ -16,6 -16,30 +16,7 @@@
*/
package org.apache.nifi.processors.standard;
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.io.nio.BufferPool;
-import org.apache.nifi.logging.ProcessorLog;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.standard.util.SyslogEvent;
-import org.apache.nifi.processors.standard.util.SyslogParser;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
+
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
@@@ -44,31 -64,6 +45,33 @@@ import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.io.nio.BufferPool;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.util.SyslogEvent;
+import org.apache.nifi.processors.standard.util.SyslogParser;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
++
+import org.apache.nifi.util.file.FileUtils;
+
++
@Tags({"syslog", "listen", "udp", "tcp", "logs"})
@CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " +
"expressions for RFC5424 and RFC3164 formatted messages. The format of each message is: (<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) " +
@@@ -108,14 -104,8 +112,13 @@@ public class ListenSyslog extends Abstr
.defaultValue("1 MB")
.required(true)
.build();
-
-
+ public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
- .name("Max number of TCP connections")
- .description("The maximum number of concurrent connections to accept syslog messages in TCP mode")
- .addValidator(StandardValidators.createLongValidator(1, 65535, true))
- .defaultValue("2")
- .required(true)
- .build();
-
++ .name("Max number of TCP connections")
++ .description("The maximum number of concurrent connections to accept syslog messages in TCP mode")
++ .addValidator(StandardValidators.createLongValidator(1, 65535, true))
++ .defaultValue("2")
++ .required(true)
++ .build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Syslog messages that match one of the expected formats will be sent out this relationship as a FlowFile per message.")
@@@ -178,17 -168,9 +181,17 @@@
final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final String protocol = context.getProperty(PROTOCOL).getValue();
final String charSet = context.getProperty(CHARSET).getValue();
+ final int maxConnections;
+
+ if (protocol.equals(UDP_VALUE)) {
+ maxConnections = 1;
- }
- else{
++ } else {
+ maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue();
+ }
-
+
parser = new SyslogParser(Charset.forName(charSet));
- bufferPool = new BufferPool(context.getMaxConcurrentTasks(), bufferSize, false, Integer.MAX_VALUE);
+ bufferPool = new BufferPool(maxConnections, bufferSize, false, Integer.MAX_VALUE);
++
syslogEvents = new LinkedBlockingQueue<>(10);
errorEvents = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
@@@ -332,37 -312,33 +336,38 @@@
@Override
public void run() {
+ final ByteBuffer buffer = bufferPool.poll();
while (!stopped) {
- final ByteBuffer buffer = bufferPool.poll();
try {
- if (buffer == null) {
- Thread.sleep(10L);
- logger.debug("no available buffers, continuing...");
- continue;
- }
-
- final SocketAddress sender = datagramChannel.receive(buffer);
- if (sender == null) {
- Thread.sleep(1000L); // nothing to do so wait...
- } else {
- final SyslogEvent event = syslogParser.parseEvent(buffer); // TODO parse with sender?
- logger.trace(event.getFullMessage());
- syslogEvents.put(event); // block until space is available
+ int selected = selector.select();
+ if (selected > 0){
+ Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
+ while (selectorKeys.hasNext()){
+ SelectionKey key = selectorKeys.next();
+ selectorKeys.remove();
+ if (!key.isValid()){
+ continue;
+ }
+ DatagramChannel channel = (DatagramChannel) key.channel();
+ SocketAddress sender;
+ buffer.clear();
+ while (!stopped && (sender = channel.receive(buffer)) != null) {
++ // TODO: Add sender address
+ final SyslogEvent event = syslogParser.parseEvent(buffer);
+ logger.trace(event.getFullMessage());
+ syslogEvents.put(event); // block until space is available
+ }
+ }
}
} catch (InterruptedException e) {
- stop();
+ stopped = true;
} catch (IOException e) {
logger.error("Error reading from DatagramChannel", e);
- } finally {
- if (buffer != null) {
- bufferPool.returnBuffer(buffer, 0);
- }
}
}
+ if (buffer != null) {
+ bufferPool.returnBuffer(buffer, 0);
+ }
}
@Override
@@@ -482,8 -421,6 +487,7 @@@
@Override
public void stop() {
+ selector.wakeup();
-
stopped = true;
}
@@@ -504,15 -441,6 +508,14 @@@
}
}
+ public void completeConnection(SelectionKey key) {
+ bufferPool.returnBuffer((ByteBuffer) key.attachment(), 0);
+ }
+
+ public void addBackForSelection(SelectionKey key) {
+ keyQueue.offer(key);
+ selector.wakeup();
+ }
-
}
/**
@@@ -520,9 -448,9 +523,8 @@@
* processing, otherwise the buffer is returned to the buffer pool.
*/
public static class SocketChannelHandler implements Runnable {
--
- private final BufferPool bufferPool;
- private final SocketChannel socketChannel;
+ private final SelectionKey key;
+ private final SocketChannelReader dispatcher;
private final SyslogParser syslogParser;
private final BlockingQueue<SyslogEvent> syslogEvents;
private final ProcessorLog logger;
@@@ -539,52 -467,52 +541,54 @@@
@Override
public void run() {
- try {
- int bytesRead = 0;
- while (bytesRead >= 0 && !Thread.interrupted()) {
-
- final ByteBuffer buffer = bufferPool.poll();
- if (buffer == null) {
- Thread.sleep(10L);
- logger.debug("no available buffers, continuing...");
- continue;
- }
+
- try {
- // read until the buffer is full
- bytesRead = socketChannel.read(buffer);
- while (bytesRead > 0) {
- bytesRead = socketChannel.read(buffer);
- }
- buffer.flip();
-
- // go through the buffer looking for the end of each message
- int bufferLength = buffer.limit();
- for (int i = 0; i < bufferLength; i++) {
- byte currByte = buffer.get(i);
- currBytes.write(currByte);
-
- // at the end of a message so parse an event, reset the buffer, and break out of the loop
- if (currByte == '\n') {
- final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(),
- socketChannel.socket().getInetAddress().toString());
- logger.trace(event.getFullMessage());
- syslogEvents.put(event); // block until space is available
- currBytes.reset();
- }
+ boolean eof = false;
+ SocketChannel socketChannel = null;
+ ByteBuffer socketBuffer = null;
+ try {
+ int bytesRead;
+ socketChannel = (SocketChannel) key.channel();
+ socketBuffer = (ByteBuffer) key.attachment();
+ // read until the buffer is full
+ while((bytesRead = socketChannel.read(socketBuffer)) > 0){
+ socketBuffer.flip();
+ socketBuffer.mark();
+ int total = socketBuffer.remaining();
+ // go through the buffer looking for the end of each message
+ for (int i = 0; i < total; i++) {
+ byte currByte = socketBuffer.get();
+ currBytes.write(currByte);
+ // at the end of a message so parse an event, reset the buffer, and break out of the loop
+ if (currByte == '\n') {
- final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray());
++ final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(),
++ socketChannel.socket().getInetAddress().toString());
+ logger.trace(event.getFullMessage());
+ syslogEvents.put(event); // block until space is available
+ currBytes.reset();
+ socketBuffer.mark();
}
- } finally {
- bufferPool.returnBuffer(buffer, 0);
}
+ socketBuffer.reset();
+ socketBuffer.compact();
+ logger.debug("done handling SocketChannel");
+ }
+ if( bytesRead < 0 ){
+ eof = true;
}
-
- logger.debug("done handling SocketChannel");
} catch (ClosedByInterruptException | InterruptedException e) {
- // nothing to do here
+ logger.debug("read loop interrupted, closing connection");
+ eof = true;
} catch (IOException e) {
logger.error("Error reading from channel", e);
+ eof = true;
} finally {
- IOUtils.closeQuietly(socketChannel);
+ if(eof == true){
+ dispatcher.completeConnection(key);
+ IOUtils.closeQuietly(socketChannel);
+ }
+ else {
+ dispatcher.addBackForSelection(key);
+ }
}
}