You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/05/18 17:13:54 UTC

[camel] branch main updated: CAMEL-18124: camel-stream - Add readLine option to control read mode

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 6e0f7e545f0 CAMEL-18124: camel-stream - Add readLine option to control read mode
6e0f7e545f0 is described below

commit 6e0f7e545f03c57c40a21dd8a3549a0678b23e5c
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed May 18 19:13:38 2022 +0200

    CAMEL-18124: camel-stream - Add readLine option to control read mode
---
 .../component/stream/StreamEndpointConfigurer.java |   6 ++
 .../component/stream/StreamEndpointUriFactory.java |   3 +-
 .../org/apache/camel/component/stream/stream.json  |   1 +
 .../camel/component/stream/StreamConsumer.java     | 105 +++++++++++++++++++--
 .../camel/component/stream/StreamEndpoint.java     |  14 +++
 5 files changed, 122 insertions(+), 7 deletions(-)

diff --git a/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointConfigurer.java b/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointConfigurer.java
index 1ba726b3524..5a937b7daa0 100644
--- a/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointConfigurer.java
+++ b/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointConfigurer.java
@@ -51,6 +51,8 @@ public class StreamEndpointConfigurer extends PropertyConfigurerSupport implemen
         case "promptDelay": target.setPromptDelay(property(camelContext, long.class, value)); return true;
         case "promptmessage":
         case "promptMessage": target.setPromptMessage(property(camelContext, java.lang.String.class, value)); return true;
+        case "readline":
+        case "readLine": target.setReadLine(property(camelContext, boolean.class, value)); return true;
         case "readtimeout":
         case "readTimeout": target.setReadTimeout(property(camelContext, int.class, value)); return true;
         case "retry": target.setRetry(property(camelContext, boolean.class, value)); return true;
@@ -95,6 +97,8 @@ public class StreamEndpointConfigurer extends PropertyConfigurerSupport implemen
         case "promptDelay": return long.class;
         case "promptmessage":
         case "promptMessage": return java.lang.String.class;
+        case "readline":
+        case "readLine": return boolean.class;
         case "readtimeout":
         case "readTimeout": return int.class;
         case "retry": return boolean.class;
@@ -140,6 +144,8 @@ public class StreamEndpointConfigurer extends PropertyConfigurerSupport implemen
         case "promptDelay": return target.getPromptDelay();
         case "promptmessage":
         case "promptMessage": return target.getPromptMessage();
+        case "readline":
+        case "readLine": return target.isReadLine();
         case "readtimeout":
         case "readTimeout": return target.getReadTimeout();
         case "retry": return target.isRetry();
diff --git a/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointUriFactory.java b/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointUriFactory.java
index 220146fe129..d16e85ed5a1 100644
--- a/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointUriFactory.java
+++ b/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointUriFactory.java
@@ -21,7 +21,7 @@ public class StreamEndpointUriFactory extends org.apache.camel.support.component
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Set<String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(21);
+        Set<String> props = new HashSet<>(22);
         props.add("appendNewLine");
         props.add("autoCloseCount");
         props.add("bridgeErrorHandler");
@@ -39,6 +39,7 @@ public class StreamEndpointUriFactory extends org.apache.camel.support.component
         props.add("lazyStartProducer");
         props.add("promptDelay");
         props.add("promptMessage");
+        props.add("readLine");
         props.add("readTimeout");
         props.add("retry");
         props.add("scanStream");
diff --git a/components/camel-stream/src/generated/resources/org/apache/camel/component/stream/stream.json b/components/camel-stream/src/generated/resources/org/apache/camel/component/stream/stream.json
index 66aedc4935c..7fa557d5060 100644
--- a/components/camel-stream/src/generated/resources/org/apache/camel/component/stream/stream.json
+++ b/components/camel-stream/src/generated/resources/org/apache/camel/component/stream/stream.json
@@ -41,6 +41,7 @@
     "initialPromptDelay": { "kind": "parameter", "displayName": "Initial Prompt Delay", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 2000, "description": "Initial delay in milliseconds before showing the message prompt. This delay occurs only once. Can be used during system startup to avoid message prompts being written while other logging is done to the system [...]
     "promptDelay": { "kind": "parameter", "displayName": "Prompt Delay", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "description": "Optional delay in milliseconds before showing the message prompt." },
     "promptMessage": { "kind": "parameter", "displayName": "Prompt Message", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Message prompt to use when reading from stream:in; for example, you could set this to Enter a command:" },
+    "readLine": { "kind": "parameter", "displayName": "Read Line", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether to read the input stream in line mode (terminate by line breaks). Setting this to false, will instead read the entire stream until EOL." },
     "retry": { "kind": "parameter", "displayName": "Retry", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will retry opening the stream if it's overwritten, somewhat like tail --retry If reading from files then you should also enable the fileWatcher option, to make it work reliable." },
     "scanStream": { "kind": "parameter", "displayName": "Scan Stream", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "To be used for continuously reading a stream such as the unix tail command." },
     "scanStreamDelay": { "kind": "parameter", "displayName": "Scan Stream Delay", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "description": "Delay in milliseconds between read attempts when using scanStream." },
diff --git a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
index a88886a9ce1..68e3c87a5bd 100644
--- a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
+++ b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
@@ -89,7 +89,7 @@ public class StreamConsumer extends DefaultConsumer implements Runnable {
 
         // if we scan the stream we are lenient and can wait for the stream to be available later
         if (!endpoint.isScanStream()) {
-            initializeStream();
+            initializeStreamLineMode();
         }
 
         executor = endpoint.getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this,
@@ -119,7 +119,11 @@ public class StreamConsumer extends DefaultConsumer implements Runnable {
     @Override
     public void run() {
         try {
-            readFromStream();
+            if (endpoint.isReadLine()) {
+                readFromStreamLineMode();
+            } else {
+                readFromStreamRawMode();
+            }
         } catch (InterruptedException e) {
             // we are closing down so ignore
         } catch (Exception e) {
@@ -127,7 +131,7 @@ public class StreamConsumer extends DefaultConsumer implements Runnable {
         }
     }
 
-    private BufferedReader initializeStream() throws Exception {
+    private BufferedReader initializeStreamLineMode() throws Exception {
         // close old stream, before obtaining a new stream
         IOHelper.close(inputStreamToClose);
 
@@ -147,10 +151,90 @@ public class StreamConsumer extends DefaultConsumer implements Runnable {
         }
     }
 
-    private void readFromStream() throws Exception {
+    private InputStream initializeStreamRawMode() throws Exception {
+        // close old stream, before obtaining a new stream
+        IOHelper.close(inputStreamToClose);
+
+        if ("in".equals(uri)) {
+            inputStream = System.in;
+            inputStreamToClose = null;
+        } else if ("file".equals(uri)) {
+            inputStream = resolveStreamFromFile();
+            inputStreamToClose = inputStream;
+        }
+
+        return inputStream;
+    }
+
+    private void readFromStreamRawMode() throws Exception {
+        long index = 0;
+        InputStream is = initializeStreamRawMode();
+
+        if (endpoint.isScanStream()) {
+            // repeat scanning from stream
+            while (isRunAllowed()) {
+
+                byte[] data = null;
+                try {
+                    data = is.readAllBytes();
+                } catch (IOException e) {
+                    // ignore
+                }
+                boolean eos = data == null || data.length == 0;
+
+                if (isRunAllowed() && endpoint.isRetry()) {
+                    boolean reOpen = true;
+                    if (endpoint.isFileWatcher()) {
+                        reOpen = watchFileChanged;
+                    }
+                    if (reOpen) {
+                        LOG.debug("File: {} changed/rollover, re-reading file from beginning", file);
+                        is = initializeStreamRawMode();
+                        // we have re-initialized the stream so lower changed flag
+                        if (endpoint.isFileWatcher()) {
+                            watchFileChanged = false;
+                        }
+                    } else {
+                        LOG.trace("File: {} not changed since last read", file);
+                    }
+                }
+
+                // sleep only if there is no input
+                if (eos) {
+                    try {
+                        Thread.sleep(endpoint.getScanStreamDelay());
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        break;
+                    }
+                }
+            }
+        } else {
+            // regular read stream once until end of stream
+            boolean eos = false;
+            byte[] data = null;
+            while (!eos && isRunAllowed()) {
+                if (endpoint.getPromptMessage() != null) {
+                    doPromptMessage();
+                }
+
+                try {
+                    data = is.readAllBytes();
+                } catch (IOException e) {
+                    // ignore
+                }
+                eos = data == null || data.length == 0;
+                if (!eos) {
+                    processRaw(data, index);
+                }
+            }
+        }
+    }
+
+    private void readFromStreamLineMode() throws Exception {
         long index = 0;
         String line;
-        BufferedReader br = initializeStream();
+        BufferedReader br = initializeStreamLineMode();
 
         if (endpoint.isScanStream()) {
             // repeat scanning from stream
@@ -171,7 +255,7 @@ public class StreamConsumer extends DefaultConsumer implements Runnable {
                     }
                     if (reOpen) {
                         LOG.debug("File: {} changed/rollover, re-reading file from beginning", file);
-                        br = initializeStream();
+                        br = initializeStreamLineMode();
                         // we have re-initialized the stream so lower changed flag
                         if (endpoint.isFileWatcher()) {
                             watchFileChanged = false;
@@ -254,6 +338,15 @@ public class StreamConsumer extends DefaultConsumer implements Runnable {
         return index;
     }
 
+    /**
+     * Strategy method for processing the data
+     */
+    protected synchronized long processRaw(byte[] body, long index) throws Exception {
+        Exchange exchange = createExchange(body, index++, true);
+        getProcessor().process(exchange);
+        return index;
+    }
+
     /**
      * Strategy method for prompting the prompt message
      */
diff --git a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
index 56d82cd34ea..8ca1b54341d 100644
--- a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
+++ b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
@@ -69,6 +69,8 @@ public class StreamEndpoint extends DefaultEndpoint {
     private long initialPromptDelay = 2000;
     @UriParam(label = "consumer")
     private int groupLines;
+    @UriParam(label = "consumer", defaultValue = "true")
+    private boolean readLine = true;
     @UriParam(label = "producer", defaultValue = "true")
     private boolean appendNewLine = true;
     @UriParam(label = "producer")
@@ -262,6 +264,18 @@ public class StreamEndpoint extends DefaultEndpoint {
         this.groupLines = groupLines;
     }
 
+    public boolean isReadLine() {
+        return readLine;
+    }
+
+    /**
+     * Whether to read the input stream in line mode (terminate by line breaks). Setting this to false, will instead
+     * read the entire stream until EOL.
+     */
+    public void setReadLine(boolean readLine) {
+        this.readLine = readLine;
+    }
+
     public int getAutoCloseCount() {
         return autoCloseCount;
     }