You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/02 21:30:35 UTC

[10/10] nifi git commit: merge of master and includes tcp and udp socket changes - requires rebase before merging into master

merge of master and includes tcp and udp socket changes - requires rebase before merging into master


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3d759fec
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3d759fec
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3d759fec

Branch: refs/heads/NIFI-274
Commit: 3d759feca529d09f940ffada511a88d689c33d97
Parents: 7f58b2a 664bda8
Author: Tony Kurc <tr...@gmail.com>
Authored: Mon Nov 2 15:28:26 2015 -0500
Committer: Tony Kurc <tr...@gmail.com>
Committed: Mon Nov 2 15:28:47 2015 -0500

----------------------------------------------------------------------
 .../java/org/apache/nifi/action/Action.java     |  50 ++
 .../java/org/apache/nifi/action/Component.java  |  34 +
 .../java/org/apache/nifi/action/Operation.java  |  37 +
 .../component/details/ComponentDetails.java     |  26 +
 .../component/details/ExtensionDetails.java     |  26 +
 .../details/RemoteProcessGroupDetails.java      |  26 +
 .../nifi/action/details/ActionDetails.java      |  26 +
 .../nifi/action/details/ConfigureDetails.java   |  30 +
 .../nifi/action/details/ConnectDetails.java     |  40 +
 .../apache/nifi/action/details/MoveDetails.java |  30 +
 .../nifi/action/details/PurgeDetails.java       |  28 +
 .../apache/nifi/processor/ProcessSession.java   |  27 +
 .../nifi/provenance/ProvenanceEventType.java    |  31 +-
 .../nifi/provenance/ProvenanceReporter.java     |  37 +
 .../org/apache/nifi/reporting/EventAccess.java  |  13 +
 .../client/socket/EndpointConnectionPool.java   |   3 +-
 .../src/main/asciidoc/getting-started.adoc      | 754 +++++++++++++++++
 .../src/main/asciidoc/images/add-processor.png  | Bin 31524 -> 92164 bytes
 nifi-docs/src/main/asciidoc/user-guide.adoc     |   2 +-
 .../org/apache/nifi/util/MockEventAccess.java   |  27 +
 .../apache/nifi/util/MockProcessSession.java    |   8 +
 .../nifi/util/MockProvenanceReporter.java       |  35 +-
 .../nifi/processors/aws/s3/FetchS3Object.java   |   2 +-
 .../org/apache/nifi/admin/dao/ActionDAO.java    |  12 +-
 .../java/org/apache/nifi/admin/dao/UserDAO.java |   3 +-
 .../nifi/admin/dao/impl/StandardActionDAO.java  |  93 ++-
 .../nifi/admin/dao/impl/StandardUserDAO.java    |   4 +-
 .../apache/nifi/admin/service/AuditService.java |  18 +-
 .../admin/service/action/AddActionsAction.java  |   3 +-
 .../service/action/PurgeActionsAction.java      |   3 +-
 .../service/impl/StandardAuditService.java      |  24 +-
 .../resources/nifi-administration-context.xml   |   2 +-
 .../manager/impl/ClusteredEventAccess.java      |  21 +-
 .../cluster/manager/impl/WebClusterManager.java |   2 +-
 .../apache/nifi/controller/FlowController.java  | 158 ++--
 .../repository/BatchingSessionFactory.java      |   5 +
 .../repository/FileSystemRepository.java        |  10 +-
 .../repository/StandardProcessSession.java      |  18 +-
 .../repository/StandardProvenanceReporter.java  |  31 +-
 .../nifi/processor/SimpleProcessLogger.java     |  20 +-
 .../nifi/spring/FlowControllerFactoryBean.java  |   8 +
 .../src/main/resources/nifi-context.xml         |   1 +
 .../controller/StandardFlowServiceTest.java     |   5 +-
 .../repository/TestStandardProcessSession.java  |  39 +-
 .../nifi/processor/TestSimpleProcessLogger.java | 101 +++
 .../nifi-framework/nifi-user-actions/pom.xml    |   7 +
 .../java/org/apache/nifi/action/Action.java     | 121 ---
 .../java/org/apache/nifi/action/Component.java  |  34 -
 .../apache/nifi/action/FlowChangeAction.java    | 130 +++
 .../java/org/apache/nifi/action/Operation.java  |  37 -
 .../component/details/ComponentDetails.java     |  26 -
 .../component/details/ExtensionDetails.java     |  34 -
 .../details/FlowChangeExtensionDetails.java     |  35 +
 .../FlowChangeRemoteProcessGroupDetails.java    |  35 +
 .../details/RemoteProcessGroupDetails.java      |  34 -
 .../nifi/action/details/ActionDetails.java      |  26 -
 .../nifi/action/details/ConfigureDetails.java   |  52 --
 .../nifi/action/details/ConnectDetails.java     |  90 --
 .../details/FlowChangeConfigureDetails.java     |  55 ++
 .../details/FlowChangeConnectDetails.java       |  97 +++
 .../action/details/FlowChangeMoveDetails.java   |  65 ++
 .../action/details/FlowChangePurgeDetails.java  |  46 +
 .../apache/nifi/action/details/MoveDetails.java |  61 --
 .../nifi/action/details/PurgeDetails.java       |  45 -
 .../apache/nifi/audit/ControllerAuditor.java    |  27 +-
 .../nifi/audit/ControllerServiceAuditor.java    |  43 +-
 .../org/apache/nifi/audit/FunnelAuditor.java    |   7 +-
 .../java/org/apache/nifi/audit/NiFiAuditor.java |  25 +-
 .../java/org/apache/nifi/audit/PortAuditor.java |  27 +-
 .../apache/nifi/audit/ProcessGroupAuditor.java  |  25 +-
 .../org/apache/nifi/audit/ProcessorAuditor.java |  25 +-
 .../apache/nifi/audit/RelationshipAuditor.java  |  18 +-
 .../nifi/audit/RemoteProcessGroupAuditor.java   |  35 +-
 .../apache/nifi/audit/ReportingTaskAuditor.java |  25 +-
 .../org/apache/nifi/audit/SnippetAuditor.java   |  26 +-
 .../nifi/web/StandardNiFiServiceFacade.java     |   9 +-
 .../StandardNiFiWebConfigurationContext.java    |  13 +-
 .../apache/nifi/web/StandardNiFiWebContext.java |  13 +-
 .../nifi/web/api/ApplicationResource.java       |   5 +-
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  20 +-
 .../nifi/processors/hadoop/FetchHDFS.java       |   2 +-
 .../apache/nifi/processors/kafka/PutKafka.java  |   6 +-
 .../nifi-pcap-processors/.gitignore             |   1 -
 .../nifi-standard-processors/pom.xml            |   5 +-
 .../standard/AbstractSyslogProcessor.java       |   5 +-
 .../processors/standard/AttributesToJSON.java   | 242 ++++++
 .../processors/standard/FetchFileTransfer.java  |   3 +-
 .../nifi/processors/standard/InvokeHTTP.java    | 198 +++--
 .../nifi/processors/standard/ListenSyslog.java  |  32 +-
 .../nifi/processors/standard/MergeContent.java  |   6 +-
 .../apache/nifi/processors/standard/PutSQL.java | 156 ++--
 .../nifi/processors/standard/ReplaceText.java   | 431 +++++++---
 .../processors/standard/util/SyslogEvent.java   |  13 +
 .../processors/standard/util/SyslogParser.java  |  18 +-
 .../org.apache.nifi.processor.Processor         |   1 +
 .../standard/TestAttributesToJSON.java          | 282 +++++++
 .../processors/standard/TestInvokeHTTP.java     | 607 +-------------
 .../processors/standard/TestInvokeHttpSSL.java  |  90 ++
 .../nifi/processors/standard/TestPutSQL.java    |  46 +-
 .../processors/standard/TestReplaceText.java    | 635 +++++++++++++-
 .../standard/TestReplaceTextLineByLine.java     | 336 --------
 .../standard/util/TestInvokeHttpCommon.java     | 830 +++++++++++++++++++
 .../standard/util/TestSyslogParser.java         |  15 +
 .../AppendLineByLineTest.txt                    |  11 +
 .../PrependLineByLineTest.txt                   |  11 +
 105 files changed, 5092 insertions(+), 2136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/3d759fec/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/nifi/blob/3d759fec/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
----------------------------------------------------------------------
diff --cc nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 457ec5d,9f57c9f..3105b1f
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@@ -16,6 -16,30 +16,7 @@@
   */
  package org.apache.nifi.processors.standard;
  
 -import org.apache.commons.io.IOUtils;
 -import org.apache.nifi.annotation.behavior.WritesAttribute;
 -import org.apache.nifi.annotation.behavior.WritesAttributes;
 -import org.apache.nifi.annotation.documentation.CapabilityDescription;
 -import org.apache.nifi.annotation.documentation.Tags;
 -import org.apache.nifi.annotation.lifecycle.OnScheduled;
 -import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 -import org.apache.nifi.components.PropertyDescriptor;
 -import org.apache.nifi.flowfile.FlowFile;
 -import org.apache.nifi.flowfile.attributes.CoreAttributes;
 -import org.apache.nifi.io.nio.BufferPool;
 -import org.apache.nifi.logging.ProcessorLog;
 -import org.apache.nifi.processor.DataUnit;
 -import org.apache.nifi.processor.ProcessContext;
 -import org.apache.nifi.processor.ProcessSession;
 -import org.apache.nifi.processor.ProcessorInitializationContext;
 -import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.exception.ProcessException;
 -import org.apache.nifi.processor.io.OutputStreamCallback;
 -import org.apache.nifi.processor.util.StandardValidators;
 -import org.apache.nifi.processors.standard.util.SyslogEvent;
 -import org.apache.nifi.processors.standard.util.SyslogParser;
 -import org.apache.nifi.stream.io.ByteArrayOutputStream;
+ 
  import java.io.IOException;
  import java.io.OutputStream;
  import java.net.InetSocketAddress;
@@@ -44,31 -64,6 +45,33 @@@ import java.util.concurrent.Executors
  import java.util.concurrent.LinkedBlockingQueue;
  import java.util.concurrent.TimeUnit;
  
 +import org.apache.commons.io.IOUtils;
 +import org.apache.nifi.annotation.behavior.WritesAttribute;
 +import org.apache.nifi.annotation.behavior.WritesAttributes;
 +import org.apache.nifi.annotation.documentation.CapabilityDescription;
 +import org.apache.nifi.annotation.documentation.Tags;
 +import org.apache.nifi.annotation.lifecycle.OnScheduled;
 +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 +import org.apache.nifi.components.PropertyDescriptor;
 +import org.apache.nifi.flowfile.FlowFile;
 +import org.apache.nifi.flowfile.attributes.CoreAttributes;
 +import org.apache.nifi.io.nio.BufferPool;
 +import org.apache.nifi.logging.ProcessorLog;
 +import org.apache.nifi.processor.DataUnit;
 +import org.apache.nifi.processor.ProcessContext;
 +import org.apache.nifi.processor.ProcessSession;
 +import org.apache.nifi.processor.ProcessorInitializationContext;
 +import org.apache.nifi.processor.Relationship;
 +import org.apache.nifi.processor.exception.ProcessException;
 +import org.apache.nifi.processor.io.OutputStreamCallback;
 +import org.apache.nifi.processor.util.StandardValidators;
 +import org.apache.nifi.processors.standard.util.SyslogEvent;
 +import org.apache.nifi.processors.standard.util.SyslogParser;
 +import org.apache.nifi.stream.io.ByteArrayOutputStream;
++
 +import org.apache.nifi.util.file.FileUtils;
 +
++
  @Tags({"syslog", "listen", "udp", "tcp", "logs"})
  @CapabilityDescription("Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular " +
          "expressions for RFC5424 and RFC3164 formatted messages. The format of each message is: (<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) " +
@@@ -108,14 -104,8 +112,13 @@@ public class ListenSyslog extends Abstr
              .defaultValue("1 MB")
              .required(true)
              .build();
 -
 -
 +    public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
-     .name("Max number of TCP connections")
-     .description("The maximum number of concurrent connections to accept syslog messages in TCP mode")
-     .addValidator(StandardValidators.createLongValidator(1, 65535, true))
-     .defaultValue("2")
-     .required(true)
-     .build();
- 
++            .name("Max number of TCP connections")
++            .description("The maximum number of concurrent connections to accept syslog messages in TCP mode")
++            .addValidator(StandardValidators.createLongValidator(1, 65535, true))
++            .defaultValue("2")
++            .required(true)
++            .build();
      public static final Relationship REL_SUCCESS = new Relationship.Builder()
              .name("success")
              .description("Syslog messages that match one of the expected formats will be sent out this relationship as a FlowFile per message.")
@@@ -178,17 -168,9 +181,17 @@@
          final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
          final String protocol = context.getProperty(PROTOCOL).getValue();
          final String charSet = context.getProperty(CHARSET).getValue();
 +        final int maxConnections; 
 +        
 +        if (protocol.equals(UDP_VALUE)) {
 +            maxConnections = 1;
-         }
-         else{
++        } else {
 +            maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue();
 +        }
-         
+ 
          parser = new SyslogParser(Charset.forName(charSet));
 -        bufferPool = new BufferPool(context.getMaxConcurrentTasks(), bufferSize, false, Integer.MAX_VALUE);
 +        bufferPool = new BufferPool(maxConnections, bufferSize, false, Integer.MAX_VALUE);
++
          syslogEvents = new LinkedBlockingQueue<>(10);
          errorEvents = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
  
@@@ -332,37 -312,33 +336,38 @@@
  
          @Override
          public void run() {
 +            final ByteBuffer buffer = bufferPool.poll();
              while (!stopped) {
 -                final ByteBuffer buffer = bufferPool.poll();
                  try {
 -                    if (buffer == null) {
 -                        Thread.sleep(10L);
 -                        logger.debug("no available buffers, continuing...");
 -                        continue;
 -                    }
 -
 -                    final SocketAddress sender = datagramChannel.receive(buffer);
 -                    if (sender == null) {
 -                        Thread.sleep(1000L); // nothing to do so wait...
 -                    } else {
 -                        final SyslogEvent event = syslogParser.parseEvent(buffer); // TODO parse with sender?
 -                        logger.trace(event.getFullMessage());
 -                        syslogEvents.put(event); // block until space is available
 +                    int selected = selector.select();
 +                    if (selected > 0){
 +                        Iterator<SelectionKey> selectorKeys = selector.selectedKeys().iterator();
 +                        while (selectorKeys.hasNext()){
 +                            SelectionKey key = selectorKeys.next();
 +                            selectorKeys.remove();
 +                            if (!key.isValid()){
 +                                continue;
 +                            }
 +                            DatagramChannel channel = (DatagramChannel) key.channel();
 +                            SocketAddress sender;
 +                            buffer.clear();
 +                            while (!stopped && (sender = channel.receive(buffer)) != null) {
++                                // TODO: Add sender address
 +                                final SyslogEvent event = syslogParser.parseEvent(buffer);
 +                                logger.trace(event.getFullMessage());
 +                                syslogEvents.put(event); // block until space is available
 +                            }
 +                        }
                      }
                  } catch (InterruptedException e) {
 -                    stop();
 +                    stopped = true;
                  } catch (IOException e) {
                      logger.error("Error reading from DatagramChannel", e);
 -                }  finally {
 -                    if (buffer != null) {
 -                        bufferPool.returnBuffer(buffer, 0);
 -                    }
                  }
              }
 +            if (buffer != null) {
 +                bufferPool.returnBuffer(buffer, 0);
 +            }
          }
  
          @Override
@@@ -482,8 -421,6 +487,7 @@@
  
          @Override
          public void stop() {
 +            selector.wakeup();
-             
              stopped = true;
          }
  
@@@ -504,15 -441,6 +508,14 @@@
              }
          }
  
 +        public void completeConnection(SelectionKey key) {
 +            bufferPool.returnBuffer((ByteBuffer) key.attachment(), 0);
 +        }
 +
 +        public void addBackForSelection(SelectionKey key) {
 +            keyQueue.offer(key);
 +            selector.wakeup();
 +        }
- 
      }
  
      /**
@@@ -520,9 -448,9 +523,8 @@@
       * processing, otherwise the buffer is returned to the buffer pool.
       */
      public static class SocketChannelHandler implements Runnable {
--
 -        private final BufferPool bufferPool;
 -        private final SocketChannel socketChannel;
 +        private final SelectionKey key;
 +        private final SocketChannelReader dispatcher;
          private final SyslogParser syslogParser;
          private final BlockingQueue<SyslogEvent> syslogEvents;
          private final ProcessorLog logger;
@@@ -539,52 -467,52 +541,54 @@@
  
          @Override
          public void run() {
 -            try {
 -                int bytesRead = 0;
 -                while (bytesRead >= 0 && !Thread.interrupted()) {
 -
 -                    final ByteBuffer buffer = bufferPool.poll();
 -                    if (buffer == null) {
 -                        Thread.sleep(10L);
 -                        logger.debug("no available buffers, continuing...");
 -                        continue;
 -                    }
+ 
 -                    try {
 -                        // read until the buffer is full
 -                        bytesRead = socketChannel.read(buffer);
 -                        while (bytesRead > 0) {
 -                            bytesRead = socketChannel.read(buffer);
 -                        }
 -                        buffer.flip();
 -
 -                        // go through the buffer looking for the end of each message
 -                        int bufferLength = buffer.limit();
 -                        for (int i = 0; i < bufferLength; i++) {
 -                            byte currByte = buffer.get(i);
 -                            currBytes.write(currByte);
 -
 -                            // at the end of a message so parse an event, reset the buffer, and break out of the loop
 -                            if (currByte == '\n') {
 -                                final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(),
 -                                        socketChannel.socket().getInetAddress().toString());
 -                                logger.trace(event.getFullMessage());
 -                                syslogEvents.put(event); // block until space is available
 -                                currBytes.reset();
 -                            }
 +            boolean eof = false;
 +            SocketChannel socketChannel = null;
 +            ByteBuffer socketBuffer = null;
 +            try {
 +                int bytesRead;
 +                socketChannel = (SocketChannel) key.channel();
 +                socketBuffer = (ByteBuffer) key.attachment();
 +                // read until the buffer is full
 +                while((bytesRead = socketChannel.read(socketBuffer)) > 0){
 +                    socketBuffer.flip();
 +                    socketBuffer.mark();
 +                    int total = socketBuffer.remaining();
 +                    // go through the buffer looking for the end of each message
 +                    for (int i = 0; i < total; i++) {
 +                        byte currByte = socketBuffer.get();
 +                        currBytes.write(currByte);
 +                        // at the end of a message so parse an event, reset the buffer, and break out of the loop
 +                        if (currByte == '\n') {
-                             final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray());
++                            final SyslogEvent event = syslogParser.parseEvent(currBytes.toByteArray(),
++                                    socketChannel.socket().getInetAddress().toString());
 +                            logger.trace(event.getFullMessage());
 +                            syslogEvents.put(event); // block until space is available
 +                            currBytes.reset();
 +                            socketBuffer.mark();
                          }
 -                    } finally {
 -                        bufferPool.returnBuffer(buffer, 0);
                      }
 +                    socketBuffer.reset();
 +                    socketBuffer.compact();
 +                    logger.debug("done handling SocketChannel");
 +                }
 +                if( bytesRead < 0 ){
 +                    eof = true;
                  }
 -
 -                logger.debug("done handling SocketChannel");
              } catch (ClosedByInterruptException | InterruptedException e) {
 -                // nothing to do here
 +                logger.debug("read loop interrupted, closing connection");
 +                eof = true;
              } catch (IOException e) {
                  logger.error("Error reading from channel", e);
 +                eof = true;
              } finally {
 -                IOUtils.closeQuietly(socketChannel);
 +                if(eof == true){
 +                    dispatcher.completeConnection(key);
 +                    IOUtils.closeQuietly(socketChannel);
 +                }
 +                else {
 +                    dispatcher.addBackForSelection(key);
 +                }
              }
          }