You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by jb...@apache.org on 2020/06/10 11:32:30 UTC
[karaf-decanter] branch master updated: [KARAF-6699] Add connected
config on the socket appender
This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/karaf-decanter.git
The following commit(s) were added to refs/heads/master by this push:
new 7916747 [KARAF-6699] Add connected config on the socket appender
new 1703c4e Merge pull request #184 from jbonofre/KARAF-6699
7916747 is described below
commit 7916747f6f584515a41d6c8e3eb8657b3c204985
Author: jbonofre <jb...@apache.org>
AuthorDate: Wed Jun 10 06:42:16 2020 +0200
[KARAF-6699] Add connected config on the socket appender
---
appender/socket/pom.xml | 7 +
.../org.apache.karaf.decanter.appender.socket.cfg | 6 +
.../decanter/appender/socket/SocketAppender.java | 82 +++++++++---
.../appender/socket/SocketAppenderTest.java | 147 +++++++++++++++++++++
manual/src/main/asciidoc/user-guide/appenders.adoc | 11 ++
5 files changed, 233 insertions(+), 20 deletions(-)
diff --git a/appender/socket/pom.xml b/appender/socket/pom.xml
index 78bbc43..ca64d00 100644
--- a/appender/socket/pom.xml
+++ b/appender/socket/pom.xml
@@ -42,6 +42,13 @@
<groupId>org.apache.karaf.decanter.appender</groupId>
<artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
</dependency>
+
+ <!-- test -->
+ <dependency>
+ <groupId>org.apache.karaf.decanter.marshaller</groupId>
+ <artifactId>org.apache.karaf.decanter.marshaller.csv</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/appender/socket/src/main/cfg/org.apache.karaf.decanter.appender.socket.cfg b/appender/socket/src/main/cfg/org.apache.karaf.decanter.appender.socket.cfg
index 9686d4f..53fa462 100644
--- a/appender/socket/src/main/cfg/org.apache.karaf.decanter.appender.socket.cfg
+++ b/appender/socket/src/main/cfg/org.apache.karaf.decanter.appender.socket.cfg
@@ -25,5 +25,11 @@
# Port number where to send the collected data
#port=34343
+# If connected is true, the socket connection is created when the appender starts and
+# collected data are "streamed" to the socket.
+# If connected is false (default), a new socket connection is created for each data
+# to send to the socket.
+#connected=false
+
# Marshaller to use
marshaller.target=(dataFormat=json)
\ No newline at end of file
diff --git a/appender/socket/src/main/java/org/apache/karaf/decanter/appender/socket/SocketAppender.java b/appender/socket/src/main/java/org/apache/karaf/decanter/appender/socket/SocketAppender.java
index 4989206..a9505b3 100644
--- a/appender/socket/src/main/java/org/apache/karaf/decanter/appender/socket/SocketAppender.java
+++ b/appender/socket/src/main/java/org/apache/karaf/decanter/appender/socket/SocketAppender.java
@@ -26,6 +26,7 @@ import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.xml.bind.annotation.XmlType;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Dictionary;
@@ -40,9 +41,11 @@ public class SocketAppender implements EventHandler {
public static final String HOST_PROPERTY = "host";
public static final String PORT_PROPERTY = "port";
+ public static final String CONNECTED_PROPERTY = "connected";
public static final String HOST_DEFAULT = "localhost";
public static final String PORT_DEFAULT = "34343";
+ public static final String CONNECTED_DEFAULT = "false";
@Reference
public Marshaller marshaller;
@@ -51,37 +54,76 @@ public class SocketAppender implements EventHandler {
private Dictionary<String, Object> config;
+ private Socket socket;
+ private PrintWriter writer;
+
@Activate
public void activate(ComponentContext componentContext) throws Exception {
- this.config = componentContext.getProperties();
+ activate(componentContext.getProperties());
+ }
+
+ public void activate(Dictionary<String, Object> config) throws Exception {
+ this.config = config;
+ boolean connected = Boolean.parseBoolean(getValue(config, CONNECTED_PROPERTY, CONNECTED_DEFAULT));
+ if (connected) {
+ try {
+ initConnection();
+ } catch (Exception e) {
+ LOGGER.error("Can't create socket", e);
+ throw e;
+ }
+ }
+ }
+
+ @Deactivate
+ public void deactivate() {
+ closeConnection();
+ }
+
+ private void initConnection() throws Exception {
+ socket = new Socket(
+ getValue(config, HOST_PROPERTY, HOST_DEFAULT),
+ Integer.parseInt(getValue(config, PORT_PROPERTY, PORT_DEFAULT)));
+ writer = new PrintWriter(socket.getOutputStream(), true);
+ }
+
+ private void closeConnection() {
+ if (writer != null) {
+ try {
+ writer.close();
+ } catch (Exception e) {
+ // nothing to do
+ }
+ }
+ if (socket != null) {
+ try {
+ socket.close();
+ } catch (Exception e) {
+ // nothing to do
+ }
+ }
}
@Override
public void handleEvent(Event event) {
if (EventFilter.match(event, config)) {
- Socket socket = null;
- PrintWriter writer = null;
+ String data = marshaller.marshal(event);
+
+ boolean connected = Boolean.parseBoolean(getValue(config, CONNECTED_PROPERTY, CONNECTED_DEFAULT));
+
try {
- socket = new Socket(
- getValue(config, HOST_PROPERTY, HOST_DEFAULT),
- Integer.parseInt(getValue(config, PORT_PROPERTY, PORT_DEFAULT)));
- String data = marshaller.marshal(event);
- writer = new PrintWriter(socket.getOutputStream(), true);
+
+ if (!connected && socket == null) {
+ initConnection();
+ }
+
writer.println(data);
+
+ if (!connected) {
+ closeConnection();
+ }
} catch (Exception e) {
LOGGER.warn("Error sending data on the socket", e);
- } finally {
- if (writer != null) {
- writer.flush();
- writer.close();
- }
- if (socket != null) {
- try {
- socket.close();
- } catch (Exception e) {
- // nothing to do
- }
- }
}
}
}
diff --git a/appender/socket/src/test/java/org/apache/karaf/decanter/appender/socket/SocketAppenderTest.java b/appender/socket/src/test/java/org/apache/karaf/decanter/appender/socket/SocketAppenderTest.java
new file mode 100644
index 0000000..0c59b59
--- /dev/null
+++ b/appender/socket/src/test/java/org/apache/karaf/decanter/appender/socket/SocketAppenderTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.appender.socket;
+
+import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.marshaller.csv.CsvMarshaller;
+import org.junit.Assert;
+import org.junit.Test;
+import org.osgi.service.event.Event;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.*;
+
+public class SocketAppenderTest {
+
+ @Test(timeout = 60000L)
+ public void testNotConnected() throws Exception {
+ SocketAppender appender = new SocketAppender();
+ Marshaller marshaller = new CsvMarshaller();
+ appender.marshaller = marshaller;
+ Dictionary<String, Object> config = new Hashtable<>();
+ config.put("connected", "false");
+ config.put("host", "localhost");
+ config.put("port", "44445");
+ appender.activate(config);
+
+ // no exception there as the socket is bound when sending message
+
+ final List<String> received = new ArrayList<>();
+
+ Runnable server = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ ServerSocket server = new ServerSocket(44445);
+ while (true) {
+ Socket socket = server.accept();
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ received.add(line);
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ Thread serverThread = new Thread(server);
+ serverThread.start();
+
+ Map<String, String> data = new HashMap<>();
+ data.put("type", "test");
+ data.put("first", "1");
+ appender.handleEvent(new Event("test", data));
+
+ while (received.size() != 1) {
+ Thread.sleep(200);
+ }
+
+ Assert.assertEquals(1, received.size());
+ Assert.assertEquals("type=test,first=1,event.topics=test", received.get(0));
+
+ serverThread.interrupt();
+ }
+
+ @Test
+ public void testConnected() throws Exception {
+ SocketAppender appender = new SocketAppender();
+ Marshaller marshaller = new CsvMarshaller();
+ appender.marshaller = marshaller;
+ Dictionary<String, Object> config = new Hashtable<>();
+ config.put("connected", "true");
+ config.put("host", "localhost");
+ config.put("port", "44444");
+ try {
+ appender.activate(config);
+ Assert.fail("Expect ConnectionRefused exception here");
+ } catch (Exception e) {
+ // expected
+ }
+
+ // no exception there as the socket is bound when sending message
+
+ final List<String> received = new ArrayList<>();
+
+ Runnable server = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ ServerSocket server = new ServerSocket(44444);
+ while (true) {
+ Socket socket = server.accept();
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ received.add(line);
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ Thread serverThread = new Thread(server);
+ serverThread.start();
+
+ // gives time to server thread to start
+ Thread.sleep(500);
+
+ appender.activate(config);
+
+ Map<String, String> data = new HashMap<>();
+ data.put("type", "test");
+ data.put("first", "1");
+ appender.handleEvent(new Event("test", data));
+
+ while (received.size() != 1) {
+ Thread.sleep(200);
+ }
+
+ Assert.assertEquals(1, received.size());
+ Assert.assertEquals("type=test,first=1,event.topics=test", received.get(0));
+
+ serverThread.interrupt();
+ }
+
+}
diff --git a/manual/src/main/asciidoc/user-guide/appenders.adoc b/manual/src/main/asciidoc/user-guide/appenders.adoc
index 203e967..639ee30 100644
--- a/manual/src/main/asciidoc/user-guide/appenders.adoc
+++ b/manual/src/main/asciidoc/user-guide/appenders.adoc
@@ -534,10 +534,21 @@ containing:
# Port number where to send the collected data
#port=34343
+
+# If connected is true, the socket connection is created when the appender starts and
+# collected data are "streamed" to the socket.
+# If connected is false (default), a new socket connection is created for each data
+# to send to the socket.
+#connected=false
+
+# Marshaller to use
+marshaller.target=(dataFormat=json)
----
* the `host` property contains the hostname or IP address of the remote network socket collector
* the `port` property contains the port number of the remote network socket collector
+* the `connected` property defines if the socket connection is created when the appender starts, or for each data event.
+* the `marshaller.target` property defines the data format to use.
==== OrientDB