You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/11/09 02:22:03 UTC

[GitHub] [nifi] exceptionfactory commented on a change in pull request #5493: NIFI-8806 - Refactoring ListenTCP to use netty.

exceptionfactory commented on a change in pull request #5493:
URL: https://github.com/apache/nifi/pull/5493#discussion_r745232775



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -87,122 +96,200 @@
             .defaultValue(ClientAuth.REQUIRED.name())
             .build();
 
+    // Deprecated
     public static final PropertyDescriptor MAX_RECV_THREAD_POOL_SIZE = new PropertyDescriptor.Builder()
             .name("max-receiving-threads")
             .displayName("Max Number of Receiving Message Handler Threads")
             .description(
-                    "The maximum number of threads might be available for handling receiving messages ready all the time. " +
-                    "Cannot be bigger than the \"Max Number of TCP Connections\". " +
-                    "If not set, the value of \"Max Number of TCP Connections\" will be used.")
+                    "This property is deprecated and no longer used.")
             .addValidator(StandardValidators.createLongValidator(1, 65535, true))
             .required(false)
             .build();
 
+    // Deprecated
     protected static final PropertyDescriptor POOL_RECV_BUFFERS = new PropertyDescriptor.Builder()
             .name("pool-receive-buffers")
             .displayName("Pool Receive Buffers")
             .description(
-                    "When turned on, the processor uses pre-populated pool of buffers when receiving messages. " +
-                    "This is prepared during initialisation of the processor. " +
-                    "With high value of Max Number of TCP Connections and Receive Buffer Size this strategy might allocate significant amount of memory! " +
-                    "When turned off, the byte buffers will be created on demand and be destroyed after use.")
-            .required(true)
+                    "This property is deprecated and no longer used.")
+            .required(false)
             .defaultValue("True")
             .allowableValues("True", "False")
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
-    @Override
-    protected List<PropertyDescriptor> getAdditionalProperties() {
-        return Arrays.asList(
-                MAX_CONNECTIONS,
-                MAX_RECV_THREAD_POOL_SIZE,
-                POOL_RECV_BUFFERS,
-                SSL_CONTEXT_SERVICE,
-                CLIENT_AUTH
-        );
-    }
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this relationship.")
+            .build();
+
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile BlockingQueue<ByteArrayMessage> events;
+    protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
+    protected volatile EventServer eventServer;
+    protected volatile byte[] messageDemarcatorBytes;
+    protected volatile EventBatcher eventBatcher;
 
     @Override
-    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        final List<ValidationResult> results = new ArrayList<>();
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ListenerProperties.PORT);
+        descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+        // Deprecated
+        descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.CHARSET);
+        descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+        descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
+        descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
+        // Deprecated
+        descriptors.add(MAX_RECV_THREAD_POOL_SIZE);
+        // Deprecated
+        descriptors.add(POOL_RECV_BUFFERS);
+        descriptors.add(ListenerProperties.NETWORK_INTF_NAME);

Review comment:
       This property should be moved to the first position so retain the order from the current version.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -87,122 +96,200 @@
             .defaultValue(ClientAuth.REQUIRED.name())
             .build();
 
+    // Deprecated
     public static final PropertyDescriptor MAX_RECV_THREAD_POOL_SIZE = new PropertyDescriptor.Builder()
             .name("max-receiving-threads")
             .displayName("Max Number of Receiving Message Handler Threads")
             .description(
-                    "The maximum number of threads might be available for handling receiving messages ready all the time. " +
-                    "Cannot be bigger than the \"Max Number of TCP Connections\". " +
-                    "If not set, the value of \"Max Number of TCP Connections\" will be used.")
+                    "This property is deprecated and no longer used.")
             .addValidator(StandardValidators.createLongValidator(1, 65535, true))
             .required(false)
             .build();
 
+    // Deprecated
     protected static final PropertyDescriptor POOL_RECV_BUFFERS = new PropertyDescriptor.Builder()
             .name("pool-receive-buffers")
             .displayName("Pool Receive Buffers")
             .description(
-                    "When turned on, the processor uses pre-populated pool of buffers when receiving messages. " +
-                    "This is prepared during initialisation of the processor. " +
-                    "With high value of Max Number of TCP Connections and Receive Buffer Size this strategy might allocate significant amount of memory! " +
-                    "When turned off, the byte buffers will be created on demand and be destroyed after use.")
-            .required(true)
+                    "This property is deprecated and no longer used.")
+            .required(false)
             .defaultValue("True")
             .allowableValues("True", "False")
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
-    @Override
-    protected List<PropertyDescriptor> getAdditionalProperties() {
-        return Arrays.asList(
-                MAX_CONNECTIONS,
-                MAX_RECV_THREAD_POOL_SIZE,
-                POOL_RECV_BUFFERS,
-                SSL_CONTEXT_SERVICE,
-                CLIENT_AUTH
-        );
-    }
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this relationship.")
+            .build();
+
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile BlockingQueue<ByteArrayMessage> events;
+    protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
+    protected volatile EventServer eventServer;
+    protected volatile byte[] messageDemarcatorBytes;
+    protected volatile EventBatcher eventBatcher;
 
     @Override
-    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        final List<ValidationResult> results = new ArrayList<>();
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ListenerProperties.PORT);
+        descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+        // Deprecated
+        descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.CHARSET);
+        descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+        descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
+        descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
+        // Deprecated
+        descriptors.add(MAX_RECV_THREAD_POOL_SIZE);
+        // Deprecated
+        descriptors.add(POOL_RECV_BUFFERS);
+        descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
+        descriptors.add(CLIENT_AUTH);
+        descriptors.add(SSL_CONTEXT_SERVICE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
 
-        final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue();
-        final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
 
-        if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
-            results.add(new ValidationResult.Builder()
-                    .explanation("Client Auth must be provided when using TLS/SSL")
-                    .valid(false).subject("Client Auth").build());
-        }
+    @OnScheduled
+    public void onScheduled(ProcessContext context) throws IOException {
+        int maxConnections = context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
+        int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+        InetAddress hostname = NetworkUtils.getInterfaceAddress(networkInterface);

Review comment:
       Recommend renaming to `interfaceAddress`:
   ```suggestion
           InetAddress interfaceAddress = NetworkUtils.getInterfaceAddress(networkInterface);
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -87,122 +96,200 @@
             .defaultValue(ClientAuth.REQUIRED.name())
             .build();
 
+    // Deprecated
     public static final PropertyDescriptor MAX_RECV_THREAD_POOL_SIZE = new PropertyDescriptor.Builder()
             .name("max-receiving-threads")
             .displayName("Max Number of Receiving Message Handler Threads")
             .description(
-                    "The maximum number of threads might be available for handling receiving messages ready all the time. " +
-                    "Cannot be bigger than the \"Max Number of TCP Connections\". " +
-                    "If not set, the value of \"Max Number of TCP Connections\" will be used.")
+                    "This property is deprecated and no longer used.")
             .addValidator(StandardValidators.createLongValidator(1, 65535, true))
             .required(false)
             .build();
 
+    // Deprecated
     protected static final PropertyDescriptor POOL_RECV_BUFFERS = new PropertyDescriptor.Builder()
             .name("pool-receive-buffers")
             .displayName("Pool Receive Buffers")
             .description(
-                    "When turned on, the processor uses pre-populated pool of buffers when receiving messages. " +
-                    "This is prepared during initialisation of the processor. " +
-                    "With high value of Max Number of TCP Connections and Receive Buffer Size this strategy might allocate significant amount of memory! " +
-                    "When turned off, the byte buffers will be created on demand and be destroyed after use.")
-            .required(true)
+                    "This property is deprecated and no longer used.")
+            .required(false)
             .defaultValue("True")
             .allowableValues("True", "False")
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
-    @Override
-    protected List<PropertyDescriptor> getAdditionalProperties() {
-        return Arrays.asList(
-                MAX_CONNECTIONS,
-                MAX_RECV_THREAD_POOL_SIZE,
-                POOL_RECV_BUFFERS,
-                SSL_CONTEXT_SERVICE,
-                CLIENT_AUTH
-        );
-    }
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this relationship.")
+            .build();
+
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile BlockingQueue<ByteArrayMessage> events;
+    protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
+    protected volatile EventServer eventServer;
+    protected volatile byte[] messageDemarcatorBytes;
+    protected volatile EventBatcher eventBatcher;
 
     @Override
-    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        final List<ValidationResult> results = new ArrayList<>();
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ListenerProperties.PORT);
+        descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+        // Deprecated
+        descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.CHARSET);
+        descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+        descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
+        descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
+        // Deprecated
+        descriptors.add(MAX_RECV_THREAD_POOL_SIZE);
+        // Deprecated
+        descriptors.add(POOL_RECV_BUFFERS);
+        descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
+        descriptors.add(CLIENT_AUTH);
+        descriptors.add(SSL_CONTEXT_SERVICE);

Review comment:
       The order of these two properties should be reversed to retain the current order.
   ```suggestion
           descriptors.add(SSL_CONTEXT_SERVICE);
           descriptors.add(CLIENT_AUTH);
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -87,122 +96,200 @@
             .defaultValue(ClientAuth.REQUIRED.name())
             .build();
 
+    // Deprecated
     public static final PropertyDescriptor MAX_RECV_THREAD_POOL_SIZE = new PropertyDescriptor.Builder()
             .name("max-receiving-threads")
             .displayName("Max Number of Receiving Message Handler Threads")
             .description(
-                    "The maximum number of threads might be available for handling receiving messages ready all the time. " +
-                    "Cannot be bigger than the \"Max Number of TCP Connections\". " +
-                    "If not set, the value of \"Max Number of TCP Connections\" will be used.")
+                    "This property is deprecated and no longer used.")
             .addValidator(StandardValidators.createLongValidator(1, 65535, true))
             .required(false)
             .build();
 
+    // Deprecated
     protected static final PropertyDescriptor POOL_RECV_BUFFERS = new PropertyDescriptor.Builder()
             .name("pool-receive-buffers")
             .displayName("Pool Receive Buffers")
             .description(
-                    "When turned on, the processor uses pre-populated pool of buffers when receiving messages. " +
-                    "This is prepared during initialisation of the processor. " +
-                    "With high value of Max Number of TCP Connections and Receive Buffer Size this strategy might allocate significant amount of memory! " +
-                    "When turned off, the byte buffers will be created on demand and be destroyed after use.")
-            .required(true)
+                    "This property is deprecated and no longer used.")
+            .required(false)
             .defaultValue("True")
             .allowableValues("True", "False")
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
-    @Override
-    protected List<PropertyDescriptor> getAdditionalProperties() {
-        return Arrays.asList(
-                MAX_CONNECTIONS,
-                MAX_RECV_THREAD_POOL_SIZE,
-                POOL_RECV_BUFFERS,
-                SSL_CONTEXT_SERVICE,
-                CLIENT_AUTH
-        );
-    }
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this relationship.")
+            .build();
+
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile BlockingQueue<ByteArrayMessage> events;
+    protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
+    protected volatile EventServer eventServer;
+    protected volatile byte[] messageDemarcatorBytes;
+    protected volatile EventBatcher eventBatcher;
 
     @Override
-    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        final List<ValidationResult> results = new ArrayList<>();
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ListenerProperties.PORT);
+        descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+        // Deprecated
+        descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.CHARSET);
+        descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+        descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
+        descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
+        // Deprecated
+        descriptors.add(MAX_RECV_THREAD_POOL_SIZE);
+        // Deprecated
+        descriptors.add(POOL_RECV_BUFFERS);
+        descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
+        descriptors.add(CLIENT_AUTH);
+        descriptors.add(SSL_CONTEXT_SERVICE);
+        this.descriptors = Collections.unmodifiableList(descriptors);
 
-        final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue();
-        final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
 
-        if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
-            results.add(new ValidationResult.Builder()
-                    .explanation("Client Auth must be provided when using TLS/SSL")
-                    .valid(false).subject("Client Auth").build());
-        }
+    @OnScheduled
+    public void onScheduled(ProcessContext context) throws IOException {
+        int maxConnections = context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
+        int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+        InetAddress hostname = NetworkUtils.getInterfaceAddress(networkInterface);
+        Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
+        port = context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
+        events = new LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
+        errorEvents = new LinkedBlockingQueue<>();
+        final String msgDemarcator = getMessageDemarcator(context);
+        messageDemarcatorBytes = msgDemarcator.getBytes(charset);
+        final NettyEventServerFactory eventFactory = new ByteArrayMessageNettyEventServerFactory(getLogger(), hostname, port, TransportProtocol.TCP, messageDemarcatorBytes, bufferSize, events);
 
-        final int maxConnections = validationContext.getProperty(MAX_CONNECTIONS).asInteger();
+        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        if (sslContextService != null) {
+            final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
+            ClientAuth clientAuth = ClientAuth.valueOf(clientAuthValue);
+            SSLContext sslContext = sslContextService.createContext();
+            eventFactory.setSslContext(sslContext);
+            eventFactory.setClientAuth(clientAuth);
+        }
 
-        if (validationContext.getProperty(MAX_RECV_THREAD_POOL_SIZE).isSet()) {
-            final int maxPoolSize = validationContext.getProperty(MAX_RECV_THREAD_POOL_SIZE).asInteger();
+        eventFactory.setSocketReceiveBuffer(bufferSize);
+        eventFactory.setWorkerThreads(maxConnections);
 
-            if (maxPoolSize > maxConnections) {
-                results.add(new ValidationResult.Builder()
-                        .explanation("\"" + MAX_RECV_THREAD_POOL_SIZE.getDisplayName() + "\" cannot be bigger than \"" + MAX_CONNECTIONS.getDisplayName() + "\"")
-                        .valid(false)
-                        .subject(MAX_RECV_THREAD_POOL_SIZE.getDisplayName())
-                        .build());
-            }
+        try {
+            eventServer = eventFactory.getEventServer();
+        } catch (EventException e) {
+            getLogger().error("Failed to bind to [{}:{}].", hostname.getHostAddress(), port);

Review comment:
       This should log the exception to include the stack trace:
   ```suggestion
               getLogger().error("Failed to start server on [{}:{}]", hostname.getHostAddress(), port, e);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org