You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2015/08/29 00:02:19 UTC

[03/11] incubator-apex-core git commit: SPOI-1770 exposing list of latest 100 topics

SPOI-1770 exposing list of latest 100 topics


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/4e5c9e56
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/4e5c9e56
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/4e5c9e56

Branch: refs/heads/devel-3
Commit: 4e5c9e561f34aed94f6f205f8b13cd290419f5a9
Parents: 0e3a272
Author: David Yan <da...@datatorrent.com>
Authored: Thu Jan 16 18:02:44 2014 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 10:56:56 2015 -0700

----------------------------------------------------------------------
 PubSubWebSocketServlet.java | 22 +++++++++++++++++++---
 1 file changed, 19 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4e5c9e56/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
index cd7a71f..21bfd73 100644
--- a/PubSubWebSocketServlet.java
+++ b/PubSubWebSocketServlet.java
@@ -8,11 +8,10 @@ import com.datatorrent.api.util.JacksonObjectMapperProvider;
 import com.datatorrent.api.util.PubSubMessage;
 import com.datatorrent.api.util.PubSubMessage.PubSubMessageType;
 import com.datatorrent.api.util.PubSubMessageCodec;
+import com.datatorrent.stram.util.LRUCache;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
+import java.util.*;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import javax.servlet.http.HttpServletRequest;
@@ -37,6 +36,17 @@ public class PubSubWebSocketServlet extends WebSocketServlet
   private ObjectMapper mapper = (new JacksonObjectMapperProvider()).getContext(null);
   private PubSubMessageCodec<Object> codec = new PubSubMessageCodec<Object>(mapper);
   private InternalMessageHandler internalMessageHandler = null;
+  private static final int latestTopicCount = 100;
+  private LRUCache<String, Long> latestTopics = new LRUCache<String, Long>(latestTopicCount, false)
+  {
+    @Override
+    public Long put(String key, Long value)
+    {
+      remove(key); // this is to make the key the most recently inserted entry
+      return super.put(key, value);
+    }
+
+  };
 
   public interface InternalMessageHandler
   {
@@ -153,6 +163,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet
 
   public synchronized void publish(String topic, Object data)
   {
+    latestTopics.put(topic, System.currentTimeMillis());
     HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
     if (wsSet != null) {
       Iterator<PubSubWebSocket> it = wsSet.iterator();
@@ -220,6 +231,11 @@ public class PubSubWebSocketServlet extends WebSocketServlet
                 unsubscribe(this, topic + ".numSubscribers");
               }
             }
+            else if (type.equals(PubSubMessageType.GET_LATEST_TOPICS)) {
+              synchronized (this) {
+                sendData(this, "_latestTopics", latestTopics.keySet());
+              }
+            }
           }
         }
       }