You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2015/08/29 00:02:19 UTC
[03/11] incubator-apex-core git commit: SPOI-1770 exposing list of
latest 100 topics
SPOI-1770 exposing list of latest 100 topics
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/4e5c9e56
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/4e5c9e56
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/4e5c9e56
Branch: refs/heads/devel-3
Commit: 4e5c9e561f34aed94f6f205f8b13cd290419f5a9
Parents: 0e3a272
Author: David Yan <da...@datatorrent.com>
Authored: Thu Jan 16 18:02:44 2014 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 10:56:56 2015 -0700
----------------------------------------------------------------------
PubSubWebSocketServlet.java | 22 +++++++++++++++++++---
1 file changed, 19 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4e5c9e56/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
index cd7a71f..21bfd73 100644
--- a/PubSubWebSocketServlet.java
+++ b/PubSubWebSocketServlet.java
@@ -8,11 +8,10 @@ import com.datatorrent.api.util.JacksonObjectMapperProvider;
import com.datatorrent.api.util.PubSubMessage;
import com.datatorrent.api.util.PubSubMessage.PubSubMessageType;
import com.datatorrent.api.util.PubSubMessageCodec;
+import com.datatorrent.stram.util.LRUCache;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
+import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import javax.servlet.http.HttpServletRequest;
@@ -37,6 +36,17 @@ public class PubSubWebSocketServlet extends WebSocketServlet
private ObjectMapper mapper = (new JacksonObjectMapperProvider()).getContext(null);
private PubSubMessageCodec<Object> codec = new PubSubMessageCodec<Object>(mapper);
private InternalMessageHandler internalMessageHandler = null;
+ private static final int latestTopicCount = 100;
+ private LRUCache<String, Long> latestTopics = new LRUCache<String, Long>(latestTopicCount, false)
+ {
+ @Override
+ public Long put(String key, Long value)
+ {
+ remove(key); // this is to make the key the most recently inserted entry
+ return super.put(key, value);
+ }
+
+ };
public interface InternalMessageHandler
{
@@ -153,6 +163,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet
public synchronized void publish(String topic, Object data)
{
+ latestTopics.put(topic, System.currentTimeMillis());
HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
if (wsSet != null) {
Iterator<PubSubWebSocket> it = wsSet.iterator();
@@ -220,6 +231,11 @@ public class PubSubWebSocketServlet extends WebSocketServlet
unsubscribe(this, topic + ".numSubscribers");
}
}
+ else if (type.equals(PubSubMessageType.GET_LATEST_TOPICS)) {
+ synchronized (this) {
+ sendData(this, "_latestTopics", latestTopics.keySet());
+ }
+ }
}
}
}