You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by jb...@apache.org on 2020/06/17 15:11:35 UTC
[karaf-decanter] branch master updated: [KARAF-6765] Improve socket
collector to support "streaming" mode
This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/karaf-decanter.git
The following commit(s) were added to refs/heads/master by this push:
new 8060cba [KARAF-6765] Improve socket collector to support "streaming" mode
new 0dea51f Merge pull request #186 from jbonofre/KARAF-6765
8060cba is described below
commit 8060cba5572b906df2e8f616af452ba648ed5bda
Author: jbonofre <jb...@apache.org>
AuthorDate: Wed Jun 17 06:48:18 2020 +0200
[KARAF-6765] Improve socket collector to support "streaming" mode
---
collector/socket/pom.xml | 12 ++
.../decanter/collector/socket/SocketCollector.java | 49 ++++----
.../collector/socket/SocketCollectorTest.java | 136 +++++++++++++++++++++
3 files changed, 169 insertions(+), 28 deletions(-)
diff --git a/collector/socket/pom.xml b/collector/socket/pom.xml
index bb221c7..9d3e070 100644
--- a/collector/socket/pom.xml
+++ b/collector/socket/pom.xml
@@ -42,6 +42,18 @@
<groupId>org.apache.karaf.decanter.collector</groupId>
<artifactId>org.apache.karaf.decanter.collector.utils</artifactId>
</dependency>
+
+ <!-- test -->
+ <dependency>
+ <groupId>org.apache.karaf.decanter.marshaller</groupId>
+ <artifactId>org.apache.karaf.decanter.marshaller.json</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.johnzon</groupId>
+ <artifactId>johnzon-mapper</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java b/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
index 65b8d28..0ff1c27 100644
--- a/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
+++ b/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
@@ -71,7 +71,11 @@ public class SocketCollector implements Closeable, Runnable {
@Activate
public void activate(ComponentContext context) throws IOException {
- this.properties = context.getProperties();
+ activate(context.getProperties());
+ }
+
+ public void activate(Dictionary<String, Object> properties) throws IOException {
+ this.properties = properties;
int port = Integer.parseInt(getProperty(this.properties, "port", "34343"));
int workers = Integer.parseInt(getProperty(this.properties, "workers", "10"));
@@ -80,9 +84,9 @@ public class SocketCollector implements Closeable, Runnable {
if (this.protocol == null) {
this.protocol = Protocol.TCP;
}
-
+
eventAdminTopic = getProperty(this.properties, EventConstants.EVENT_TOPIC, "decanter/collect/socket");
-
+
switch (protocol) {
case TCP:
this.serverSocket = new ServerSocket(port);
@@ -91,7 +95,7 @@ public class SocketCollector implements Closeable, Runnable {
this.datagramSocket = new DatagramSocket(port);
break;
}
-
+
// adding 1 for serverSocket handling
this.executor = Executors.newFixedThreadPool(workers + 1);
this.executor.execute(this);
@@ -162,32 +166,21 @@ public class SocketCollector implements Closeable, Runnable {
}
public void run() {
- try (BufferedInputStream bis = new BufferedInputStream(clientSocket.getInputStream())) {
- Map<String, Object> data = new HashMap<>();
- data.put("type", "socket");
- try {
- data.putAll(unmarshaller.unmarshal(bis));
- } catch (Exception e) {
- // nothing to do
- }
-
- try {
- PropertiesPreparator.prepare(data, properties);
- } catch (Exception e) {
- LOGGER.warn("Can't prepare data for the dispatcher", e);
- }
-
- Event event = new Event(eventAdminTopic, data);
- dispatcher.postEvent(event);
- } catch (EOFException e) {
- LOGGER.warn("Client closed the connection", e);
- } catch (IOException e) {
- LOGGER.warn("Exception receiving data", e);
- }
try {
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ Map<String, Object> data = new HashMap<>();
+ data.put("type", "socket");
+ data.putAll(unmarshaller.unmarshal(new ByteArrayInputStream(line.getBytes())));
+ PropertiesPreparator.prepare(data, properties);
+ Event event = new Event(eventAdminTopic, data);
+ dispatcher.postEvent(event);
+ }
+ }
clientSocket.close();
- } catch (IOException e) {
- LOGGER.info("Error closing socket", e);
+ } catch (Exception e) {
+ e.printStackTrace();
}
}
}
diff --git a/collector/socket/src/test/java/org/apache/karaf/decanter/collector/socket/SocketCollectorTest.java b/collector/socket/src/test/java/org/apache/karaf/decanter/collector/socket/SocketCollectorTest.java
new file mode 100644
index 0000000..3d77020
--- /dev/null
+++ b/collector/socket/src/test/java/org/apache/karaf/decanter/collector/socket/SocketCollectorTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.collector.socket;
+
+import org.apache.karaf.decanter.marshaller.json.JsonUnmarshaller;
+import org.junit.Assert;
+import org.junit.Test;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+
+public class SocketCollectorTest {
+
+ @Test(timeout = 60000)
+ public void singleMessageTest() throws Exception {
+ DispatcherMock dispatcher = new DispatcherMock();
+
+ SocketCollector collector = new SocketCollector();
+ collector.unmarshaller = new JsonUnmarshaller();
+ collector.dispatcher = dispatcher;
+ collector.activate(new Hashtable<>());
+
+ Socket socket = new Socket("localhost", 34343);
+
+ PrintWriter writer = new PrintWriter(socket.getOutputStream(), true);
+ writer.println("{\"foo\":\"bar\"}");
+ writer.flush();
+
+ while (dispatcher.postedEvents.size() < 1) {
+ Thread.sleep(200);
+ }
+
+ Assert.assertEquals(1, dispatcher.postedEvents.size());
+
+ Assert.assertEquals("socket", dispatcher.postedEvents.get(0).getProperty("type"));
+ Assert.assertEquals("decanter/collect/socket", dispatcher.postedEvents.get(0).getProperty("event.topics"));
+ Assert.assertEquals("bar", dispatcher.postedEvents.get(0).getProperty("foo"));
+
+ collector.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testStreaming() throws Exception {
+ DispatcherMock dispatcher = new DispatcherMock();
+
+ SocketCollector collector = new SocketCollector();
+ collector.dispatcher = dispatcher;
+ collector.unmarshaller = new JsonUnmarshaller();
+ collector.activate(new Hashtable<>());
+
+ Socket socket = new Socket("localhost", 34343);
+
+ PrintWriter writer = new PrintWriter(socket.getOutputStream(), true);
+ writer.println("{\"first\":1}");
+ writer.flush();
+ writer.println("{\"second\":2}");
+ writer.flush();
+
+ while (dispatcher.postedEvents.size() < 2) {
+ Thread.sleep(200);
+ }
+
+ Assert.assertEquals(2, dispatcher.postedEvents.size());
+
+ Assert.assertEquals(1L, dispatcher.postedEvents.get(0).getProperty("first"));
+ Assert.assertEquals(2L, dispatcher.postedEvents.get(1).getProperty("second"));
+
+ collector.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testMultiClients() throws Exception {
+ DispatcherMock dispatcher = new DispatcherMock();
+
+ SocketCollector collector = new SocketCollector();
+ collector.dispatcher = dispatcher;
+ collector.unmarshaller = new JsonUnmarshaller();
+ collector.activate(new Hashtable<>());
+
+ Socket client1 = new Socket("localhost", 34343);
+ Socket client2 = new Socket("localhost", 34343);
+
+ PrintWriter writer1 = new PrintWriter(client1.getOutputStream(), true);
+ writer1.println("{\"client\":\"client1\"}");
+ writer1.flush();
+
+ PrintWriter writer2 = new PrintWriter(client2.getOutputStream(), true);
+ writer2.println("{\"client\":\"client2\"}");
+ writer2.flush();
+
+ while (dispatcher.postedEvents.size() < 2) {
+ Thread.sleep(200);
+ }
+
+ Assert.assertEquals(2, dispatcher.postedEvents.size());
+
+ collector.close();
+ }
+
+ class DispatcherMock implements EventAdmin {
+
+ public List<Event> postedEvents = new ArrayList<>();
+ public List<Event> sentEvents = new ArrayList<>();
+
+ @Override
+ public void postEvent(Event event) {
+ postedEvents.add(event);
+ }
+
+ @Override
+ public void sendEvent(Event event) {
+ sentEvents.add(event);
+ }
+
+ }
+
+}