You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by jb...@apache.org on 2020/06/10 11:32:30 UTC

[karaf-decanter] branch master updated: [KARAF-6699] Add connected config on the socket appender

This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/karaf-decanter.git


The following commit(s) were added to refs/heads/master by this push:
     new 7916747  [KARAF-6699] Add connected config on the socket appender
     new 1703c4e  Merge pull request #184 from jbonofre/KARAF-6699
7916747 is described below

commit 7916747f6f584515a41d6c8e3eb8657b3c204985
Author: jbonofre <jb...@apache.org>
AuthorDate: Wed Jun 10 06:42:16 2020 +0200

    [KARAF-6699] Add connected config on the socket appender
---
 appender/socket/pom.xml                            |   7 +
 .../org.apache.karaf.decanter.appender.socket.cfg  |   6 +
 .../decanter/appender/socket/SocketAppender.java   |  82 +++++++++---
 .../appender/socket/SocketAppenderTest.java        | 147 +++++++++++++++++++++
 manual/src/main/asciidoc/user-guide/appenders.adoc |  11 ++
 5 files changed, 233 insertions(+), 20 deletions(-)

diff --git a/appender/socket/pom.xml b/appender/socket/pom.xml
index 78bbc43..ca64d00 100644
--- a/appender/socket/pom.xml
+++ b/appender/socket/pom.xml
@@ -42,6 +42,13 @@
             <groupId>org.apache.karaf.decanter.appender</groupId>
             <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
         </dependency>
+
+        <!-- test -->
+        <dependency>
+            <groupId>org.apache.karaf.decanter.marshaller</groupId>
+            <artifactId>org.apache.karaf.decanter.marshaller.csv</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/appender/socket/src/main/cfg/org.apache.karaf.decanter.appender.socket.cfg b/appender/socket/src/main/cfg/org.apache.karaf.decanter.appender.socket.cfg
index 9686d4f..53fa462 100644
--- a/appender/socket/src/main/cfg/org.apache.karaf.decanter.appender.socket.cfg
+++ b/appender/socket/src/main/cfg/org.apache.karaf.decanter.appender.socket.cfg
@@ -25,5 +25,11 @@
 # Port number where to send the collected data
 #port=34343
 
+# If connected is true, the socket connection is created when the appender starts and
+# collected data are "streamed" to the socket.
+# If connected is false (default), a new socket connection is created for each data
+# to send to the socket.
+#connected=false
+
 # Marshaller to use
 marshaller.target=(dataFormat=json)
\ No newline at end of file
diff --git a/appender/socket/src/main/java/org/apache/karaf/decanter/appender/socket/SocketAppender.java b/appender/socket/src/main/java/org/apache/karaf/decanter/appender/socket/SocketAppender.java
index 4989206..a9505b3 100644
--- a/appender/socket/src/main/java/org/apache/karaf/decanter/appender/socket/SocketAppender.java
+++ b/appender/socket/src/main/java/org/apache/karaf/decanter/appender/socket/SocketAppender.java
@@ -26,6 +26,7 @@ import org.osgi.service.event.EventHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.xml.bind.annotation.XmlType;
 import java.io.PrintWriter;
 import java.net.Socket;
 import java.util.Dictionary;
@@ -40,9 +41,11 @@ public class SocketAppender implements EventHandler {
 
     public static final String HOST_PROPERTY = "host";
     public static final String PORT_PROPERTY = "port";
+    public static final String CONNECTED_PROPERTY = "connected";
 
     public static final String HOST_DEFAULT = "localhost";
     public static final String PORT_DEFAULT = "34343";
+    public static final String CONNECTED_DEFAULT = "false";
 
     @Reference
     public Marshaller marshaller;
@@ -51,37 +54,76 @@ public class SocketAppender implements EventHandler {
 
     private Dictionary<String, Object> config;
 
+    private Socket socket;
+    private PrintWriter writer;
+
     @Activate
     public void activate(ComponentContext componentContext) throws Exception {
-        this.config = componentContext.getProperties();
+        activate(componentContext.getProperties());
+    }
+
+    public void activate(Dictionary<String, Object> config) throws Exception {
+        this.config = config;
+        boolean connected = Boolean.parseBoolean(getValue(config, CONNECTED_PROPERTY, CONNECTED_DEFAULT));
+        if (connected) {
+            try {
+                initConnection();
+            } catch (Exception e) {
+                LOGGER.error("Can't create socket", e);
+                throw e;
+            }
+        }
+    }
+
+    @Deactivate
+    public void deactivate() {
+        closeConnection();
+    }
+
+    private void initConnection() throws Exception {
+        socket = new Socket(
+                getValue(config, HOST_PROPERTY, HOST_DEFAULT),
+                Integer.parseInt(getValue(config, PORT_PROPERTY, PORT_DEFAULT)));
+        writer = new PrintWriter(socket.getOutputStream(), true);
+    }
+
+    private void closeConnection() {
+        if (writer != null) {
+            try {
+                writer.close();
+            } catch (Exception e) {
+                // nothing to do
+            }
+        }
+        if (socket != null) {
+            try {
+                socket.close();
+            } catch (Exception e) {
+                // nothing to do
+            }
+        }
     }
 
     @Override
     public void handleEvent(Event event) {
         if (EventFilter.match(event, config)) {
-            Socket socket = null;
-            PrintWriter writer = null;
+            String data = marshaller.marshal(event);
+
+            boolean connected = Boolean.parseBoolean(getValue(config, CONNECTED_PROPERTY, CONNECTED_DEFAULT));
+
             try {
-                socket = new Socket(
-                        getValue(config, HOST_PROPERTY, HOST_DEFAULT),
-                        Integer.parseInt(getValue(config, PORT_PROPERTY, PORT_DEFAULT)));
-                String data = marshaller.marshal(event);
-                writer = new PrintWriter(socket.getOutputStream(), true);
+
+                if (!connected && socket == null) {
+                    initConnection();
+                }
+
                 writer.println(data);
+
+                if (!connected) {
+                    closeConnection();
+                }
             } catch (Exception e) {
                 LOGGER.warn("Error sending data on the socket", e);
-            } finally {
-                if (writer != null) {
-                    writer.flush();
-                    writer.close();
-                }
-                if (socket != null) {
-                    try {
-                        socket.close();
-                    } catch (Exception e) {
-                        // nothing to do
-                    }
-                }
             }
         }
     }
diff --git a/appender/socket/src/test/java/org/apache/karaf/decanter/appender/socket/SocketAppenderTest.java b/appender/socket/src/test/java/org/apache/karaf/decanter/appender/socket/SocketAppenderTest.java
new file mode 100644
index 0000000..0c59b59
--- /dev/null
+++ b/appender/socket/src/test/java/org/apache/karaf/decanter/appender/socket/SocketAppenderTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.appender.socket;
+
+import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.marshaller.csv.CsvMarshaller;
+import org.junit.Assert;
+import org.junit.Test;
+import org.osgi.service.event.Event;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.*;
+
+public class SocketAppenderTest {
+
+    @Test(timeout = 60000L)
+    public void testNotConnected() throws Exception {
+        SocketAppender appender = new SocketAppender();
+        Marshaller marshaller = new CsvMarshaller();
+        appender.marshaller = marshaller;
+        Dictionary<String, Object> config = new Hashtable<>();
+        config.put("connected", "false");
+        config.put("host", "localhost");
+        config.put("port", "44445");
+        appender.activate(config);
+
+        // no exception there as the socket is bound when sending message
+
+        final List<String> received = new ArrayList<>();
+
+        Runnable server = new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    ServerSocket server = new ServerSocket(44445);
+                    while (true) {
+                        Socket socket = server.accept();
+                        try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
+                            String line;
+                            while ((line = reader.readLine()) != null) {
+                                received.add(line);
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+          }
+        };
+        Thread serverThread = new Thread(server);
+        serverThread.start();
+
+        Map<String, String> data = new HashMap<>();
+        data.put("type", "test");
+        data.put("first", "1");
+        appender.handleEvent(new Event("test", data));
+
+        while (received.size() != 1) {
+            Thread.sleep(200);
+        }
+
+        Assert.assertEquals(1, received.size());
+        Assert.assertEquals("type=test,first=1,event.topics=test", received.get(0));
+
+        serverThread.interrupt();
+    }
+
+    @Test
+    public void testConnected() throws Exception {
+        SocketAppender appender = new SocketAppender();
+        Marshaller marshaller = new CsvMarshaller();
+        appender.marshaller = marshaller;
+        Dictionary<String, Object> config = new Hashtable<>();
+        config.put("connected", "true");
+        config.put("host", "localhost");
+        config.put("port", "44444");
+        try {
+            appender.activate(config);
+            Assert.fail("Expect ConnectionRefused exception here");
+        } catch (Exception e) {
+            // expected
+        }
+
+        // no exception there as the socket is bound when sending message
+
+        final List<String> received = new ArrayList<>();
+
+        Runnable server = new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    ServerSocket server = new ServerSocket(44444);
+                    while (true) {
+                        Socket socket = server.accept();
+                        try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
+                            String line;
+                            while ((line = reader.readLine()) != null) {
+                                received.add(line);
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+        Thread serverThread = new Thread(server);
+        serverThread.start();
+
+        // gives time to server thread to start
+        Thread.sleep(500);
+
+        appender.activate(config);
+
+        Map<String, String> data = new HashMap<>();
+        data.put("type", "test");
+        data.put("first", "1");
+        appender.handleEvent(new Event("test", data));
+
+        while (received.size() != 1) {
+            Thread.sleep(200);
+        }
+
+        Assert.assertEquals(1, received.size());
+        Assert.assertEquals("type=test,first=1,event.topics=test", received.get(0));
+
+        serverThread.interrupt();
+    }
+
+}
diff --git a/manual/src/main/asciidoc/user-guide/appenders.adoc b/manual/src/main/asciidoc/user-guide/appenders.adoc
index 203e967..639ee30 100644
--- a/manual/src/main/asciidoc/user-guide/appenders.adoc
+++ b/manual/src/main/asciidoc/user-guide/appenders.adoc
@@ -534,10 +534,21 @@ containing:
 
 # Port number where to send the collected data
 #port=34343
+
+# If connected is true, the socket connection is created when the appender starts and
+# collected data are "streamed" to the socket.
+# If connected is false (default), a new socket connection is created for each data
+# to send to the socket.
+#connected=false
+
+# Marshaller to use
+marshaller.target=(dataFormat=json)
 ----
 
 * the `host` property contains the hostname or IP address of the remote network socket collector
 * the `port` property contains the port number of the remote network socket collector
+* the `connected` property defines if the socket connection is created when the appender starts, or for each data event.
+* the `marshaller.target` property defines the data format to use.
 
 ==== OrientDB