You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by jb...@apache.org on 2020/06/17 15:11:35 UTC

[karaf-decanter] branch master updated: [KARAF-6765] Improve socket collector to support "streaming" mode

This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/karaf-decanter.git


The following commit(s) were added to refs/heads/master by this push:
     new 8060cba  [KARAF-6765] Improve socket collector to support "streaming" mode
     new 0dea51f  Merge pull request #186 from jbonofre/KARAF-6765
8060cba is described below

commit 8060cba5572b906df2e8f616af452ba648ed5bda
Author: jbonofre <jb...@apache.org>
AuthorDate: Wed Jun 17 06:48:18 2020 +0200

    [KARAF-6765] Improve socket collector to support "streaming" mode
---
 collector/socket/pom.xml                           |  12 ++
 .../decanter/collector/socket/SocketCollector.java |  49 ++++----
 .../collector/socket/SocketCollectorTest.java      | 136 +++++++++++++++++++++
 3 files changed, 169 insertions(+), 28 deletions(-)

diff --git a/collector/socket/pom.xml b/collector/socket/pom.xml
index bb221c7..9d3e070 100644
--- a/collector/socket/pom.xml
+++ b/collector/socket/pom.xml
@@ -42,6 +42,18 @@
             <groupId>org.apache.karaf.decanter.collector</groupId>
             <artifactId>org.apache.karaf.decanter.collector.utils</artifactId>
         </dependency>
+
+        <!-- test -->
+        <dependency>
+            <groupId>org.apache.karaf.decanter.marshaller</groupId>
+            <artifactId>org.apache.karaf.decanter.marshaller.json</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.johnzon</groupId>
+            <artifactId>johnzon-mapper</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java b/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
index 65b8d28..0ff1c27 100644
--- a/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
+++ b/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
@@ -71,7 +71,11 @@ public class SocketCollector implements Closeable, Runnable {
 
     @Activate
     public void activate(ComponentContext context) throws IOException {
-        this.properties = context.getProperties();
+        activate(context.getProperties());
+    }
+
+    public void activate(Dictionary<String, Object> properties) throws IOException {
+        this.properties = properties;
         int port = Integer.parseInt(getProperty(this.properties, "port", "34343"));
         int workers = Integer.parseInt(getProperty(this.properties, "workers", "10"));
 
@@ -80,9 +84,9 @@ public class SocketCollector implements Closeable, Runnable {
         if (this.protocol == null) {
             this.protocol = Protocol.TCP;
         }
-        
+
         eventAdminTopic = getProperty(this.properties, EventConstants.EVENT_TOPIC, "decanter/collect/socket");
-        
+
         switch (protocol) {
             case TCP:
                 this.serverSocket = new ServerSocket(port);
@@ -91,7 +95,7 @@ public class SocketCollector implements Closeable, Runnable {
                 this.datagramSocket = new DatagramSocket(port);
                 break;
         }
-        
+
         // adding 1 for serverSocket handling
         this.executor = Executors.newFixedThreadPool(workers + 1);
         this.executor.execute(this);
@@ -162,32 +166,21 @@ public class SocketCollector implements Closeable, Runnable {
         }
 
         public void run() {
-            try (BufferedInputStream bis = new BufferedInputStream(clientSocket.getInputStream())) {
-                Map<String, Object> data = new HashMap<>();
-                data.put("type", "socket");
-                try {
-                    data.putAll(unmarshaller.unmarshal(bis));
-                } catch (Exception e) {
-                    // nothing to do
-                }
-
-                try {
-                    PropertiesPreparator.prepare(data, properties);
-                } catch (Exception e) {
-                    LOGGER.warn("Can't prepare data for the dispatcher", e);
-                }
-
-                Event event = new Event(eventAdminTopic, data);
-                dispatcher.postEvent(event);
-            } catch (EOFException e) {
-                LOGGER.warn("Client closed the connection", e);
-            } catch (IOException e) {
-                LOGGER.warn("Exception receiving data", e);
-            }
             try {
+                try (BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()))) {
+                    String line;
+                    while ((line = reader.readLine()) != null) {
+                        Map<String, Object> data = new HashMap<>();
+                        data.put("type", "socket");
+                        data.putAll(unmarshaller.unmarshal(new ByteArrayInputStream(line.getBytes())));
+                        PropertiesPreparator.prepare(data, properties);
+                        Event event = new Event(eventAdminTopic, data);
+                        dispatcher.postEvent(event);
+                    }
+                }
                 clientSocket.close();
-            } catch (IOException e) {
-                LOGGER.info("Error closing socket", e);
+            } catch (Exception e) {
+                e.printStackTrace();
             }
         }
     }
diff --git a/collector/socket/src/test/java/org/apache/karaf/decanter/collector/socket/SocketCollectorTest.java b/collector/socket/src/test/java/org/apache/karaf/decanter/collector/socket/SocketCollectorTest.java
new file mode 100644
index 0000000..3d77020
--- /dev/null
+++ b/collector/socket/src/test/java/org/apache/karaf/decanter/collector/socket/SocketCollectorTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.collector.socket;
+
+import org.apache.karaf.decanter.marshaller.json.JsonUnmarshaller;
+import org.junit.Assert;
+import org.junit.Test;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+
+public class SocketCollectorTest {
+
+    @Test(timeout = 60000)
+    public void singleMessageTest() throws Exception {
+        DispatcherMock dispatcher = new DispatcherMock();
+
+        SocketCollector collector = new SocketCollector();
+        collector.unmarshaller = new JsonUnmarshaller();
+        collector.dispatcher = dispatcher;
+        collector.activate(new Hashtable<>());
+
+        Socket socket = new Socket("localhost", 34343);
+
+        PrintWriter writer = new PrintWriter(socket.getOutputStream(), true);
+        writer.println("{\"foo\":\"bar\"}");
+        writer.flush();
+
+        while (dispatcher.postedEvents.size() < 1) {
+            Thread.sleep(200);
+        }
+
+        Assert.assertEquals(1, dispatcher.postedEvents.size());
+
+        Assert.assertEquals("socket", dispatcher.postedEvents.get(0).getProperty("type"));
+        Assert.assertEquals("decanter/collect/socket", dispatcher.postedEvents.get(0).getProperty("event.topics"));
+        Assert.assertEquals("bar", dispatcher.postedEvents.get(0).getProperty("foo"));
+
+        collector.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testStreaming() throws Exception {
+        DispatcherMock dispatcher = new DispatcherMock();
+
+        SocketCollector collector = new SocketCollector();
+        collector.dispatcher = dispatcher;
+        collector.unmarshaller = new JsonUnmarshaller();
+        collector.activate(new Hashtable<>());
+
+        Socket socket = new Socket("localhost", 34343);
+
+        PrintWriter writer = new PrintWriter(socket.getOutputStream(), true);
+        writer.println("{\"first\":1}");
+        writer.flush();
+        writer.println("{\"second\":2}");
+        writer.flush();
+
+        while (dispatcher.postedEvents.size() < 2) {
+            Thread.sleep(200);
+        }
+
+        Assert.assertEquals(2, dispatcher.postedEvents.size());
+
+        Assert.assertEquals(1L, dispatcher.postedEvents.get(0).getProperty("first"));
+        Assert.assertEquals(2L, dispatcher.postedEvents.get(1).getProperty("second"));
+
+        collector.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testMultiClients() throws Exception {
+        DispatcherMock dispatcher = new DispatcherMock();
+
+        SocketCollector collector = new SocketCollector();
+        collector.dispatcher = dispatcher;
+        collector.unmarshaller = new JsonUnmarshaller();
+        collector.activate(new Hashtable<>());
+
+        Socket client1 = new Socket("localhost", 34343);
+        Socket client2 = new Socket("localhost", 34343);
+
+        PrintWriter writer1 = new PrintWriter(client1.getOutputStream(), true);
+        writer1.println("{\"client\":\"client1\"}");
+        writer1.flush();
+
+        PrintWriter writer2 = new PrintWriter(client2.getOutputStream(), true);
+        writer2.println("{\"client\":\"client2\"}");
+        writer2.flush();
+
+        while (dispatcher.postedEvents.size() < 2) {
+            Thread.sleep(200);
+        }
+
+        Assert.assertEquals(2, dispatcher.postedEvents.size());
+
+        collector.close();
+    }
+
+    class DispatcherMock implements EventAdmin {
+
+        public List<Event> postedEvents = new ArrayList<>();
+        public List<Event> sentEvents = new ArrayList<>();
+
+        @Override
+        public void postEvent(Event event) {
+            postedEvents.add(event);
+        }
+
+        @Override
+        public void sendEvent(Event event) {
+            sentEvents.add(event);
+        }
+
+    }
+
+}