You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by cs...@apache.org on 2019/01/05 08:14:50 UTC

[aries-journaled-events] branch master updated: Replace sleep with wait/notify

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/aries-journaled-events.git


The following commit(s) were added to refs/heads/master by this push:
     new 4455f8e  Replace sleep with wait/notify
4455f8e is described below

commit 4455f8e3f93834a5f339ca4d6c723414690ed7b2
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Sat Jan 5 09:14:39 2019 +0100

    Replace sleep with wait/notify
---
 .../java/org/apache/aries/events/memory/Topic.java | 61 ++++++++++++----------
 1 file changed, 33 insertions(+), 28 deletions(-)

diff --git a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
index 8d4755e..d7703a9 100644
--- a/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
+++ b/org.apache.aries.events.memory/src/main/java/org/apache/aries/events/memory/Topic.java
@@ -20,7 +20,6 @@ package org.apache.aries.events.memory;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import org.apache.aries.events.api.Message;
@@ -43,8 +42,9 @@ public class Topic {
         this.journal = new Journal<>();
     }
 
-    public Position send(Message message) {
+    public synchronized Position send(Message message) {
         long offset = this.journal.append(message);
+        notifyAll();
         return new MemoryPosition(offset);
     }
 
@@ -66,53 +66,58 @@ public class Topic {
         }
     }
 
+    private synchronized Entry<Long, Message> waitNext(long currentOffset) throws InterruptedException {
+        Entry<Long, Message> entry = journal.getNext(currentOffset);
+        if (entry != null) {
+            return entry;
+        }
+        log.debug("Waiting for next message");
+        wait();
+        return journal.getNext(currentOffset);
+    }
+
     class TopicSubscription implements Subscription {
         private Consumer<Received> callback;
         private ExecutorService executor;
-        private volatile boolean running;
         private long currentOffset;
 
         TopicSubscription(long startOffset, Consumer<Received> callback) {
             this.currentOffset = startOffset;
             this.callback = callback;
-            this.running = true;
             String name = "Poller for " + topicName;
             this.executor = Executors.newSingleThreadExecutor(r -> new Thread(r, name));
             this.executor.execute(this::poll);
         }
-        
+
         private void poll() {
-            while (running) {
-                Entry<Long, Message> entry = journal.getNext(currentOffset);
-                if (entry != null) {
-                    long offset = entry.getKey();
-                    try {
-                        MemoryPosition position = new MemoryPosition(this.currentOffset);
-                        Received received = new Received(position, entry.getValue());
-                        callback.accept(received);
-                    } catch (Exception e) {
-                        log.warn(e.getMessage(), e);
-                    }
-                    this.currentOffset = offset + 1;
-                } else {
-                    try {
-                        Thread.sleep(100);
-                    } catch (InterruptedException e) {
-                        // Ignore
+            try {
+                while (true) {
+                    Entry<Long, Message> entry = waitNext(currentOffset);
+                    if (entry != null) {
+                        handleMessage(entry);
                     }
                 }
+            } catch (InterruptedException e) {
+                log.debug("Poller thread for consumer on topic " + topicName + " stopped.");
             }
         }
 
+        private void handleMessage(Entry<Long, Message> entry) {
+            long offset = entry.getKey();
+            try {
+                MemoryPosition position = new MemoryPosition(this.currentOffset);
+                Received received = new Received(position, entry.getValue());
+                callback.accept(received);
+            } catch (Exception e) {
+                log.warn(e.getMessage(), e);
+            }
+            this.currentOffset = offset + 1;
+        }
+
         @Override
         public void close() {
-            this.running = false;
             executor.shutdown();
-            try {
-                executor.awaitTermination(10, TimeUnit.SECONDS);
-            } catch (InterruptedException e) {
-                // Ignore
-            }
+            executor.shutdownNow();
         }
 
     }