You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2015/08/29 00:02:17 UTC
[01/11] incubator-apex-core git commit: SPOI-2255 #resolve password
protect the websocket
Repository: incubator-apex-core
Updated Branches:
refs/heads/devel-3 a3e861ef4 -> d748ed46f
SPOI-2255 #resolve password protect the websocket
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/55844f86
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/55844f86
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/55844f86
Branch: refs/heads/devel-3
Commit: 55844f86af24a46907a8441787bddcf8a437e182
Parents: 9a9f153
Author: David Yan <da...@datatorrent.com>
Authored: Mon Apr 7 14:40:02 2014 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 10:56:56 2015 -0700
----------------------------------------------------------------------
PubSubWebSocketServlet.java | 50 +++++++++++++++++++++++++++++-----------
1 file changed, 37 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/55844f86/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
index 11bf98e..21b48f7 100644
--- a/PubSubWebSocketServlet.java
+++ b/PubSubWebSocketServlet.java
@@ -4,6 +4,7 @@
*/
package com.datatorrent.gateway;
+import com.datatorrent.gateway.security.AuthenticationException;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
@@ -23,6 +24,11 @@ import com.datatorrent.lib.util.PubSubMessage.PubSubMessageType;
import com.datatorrent.lib.util.PubSubMessageCodec;
import com.datatorrent.stram.util.LRUCache;
+import javax.servlet.ServletException;
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response.Status;
/**
* <p>PubSubWebSocketServlet class.</p>
@@ -40,9 +46,11 @@ public class PubSubWebSocketServlet extends WebSocketServlet
private PubSubMessageCodec<Object> codec = new PubSubMessageCodec<Object>(mapper);
private InternalMessageHandler internalMessageHandler = null;
private static final int latestTopicCount = 100;
+ private final DTGateway gateway;
private final LRUCache<String, Long> latestTopics = new LRUCache<String, Long>(latestTopicCount, false)
{
private static final long serialVersionUID = 20140131L;
+
@Override
public Long put(String key, Long value)
{
@@ -58,26 +66,42 @@ public class PubSubWebSocketServlet extends WebSocketServlet
}
- /*
- private int timeout;
-
- public void setTimeout(int timeout) {
- this.timeout = timeout;
- }
- */
+ public PubSubWebSocketServlet(DTGateway gateway)
+ {
+ this.gateway = gateway;
+ }
- /*
- private int timeout;
- public void setTimeout(int timeout) {
- this.timeout = timeout;
- }
- */
public void setInternalMessageHandler(InternalMessageHandler internalMessageHandler)
{
this.internalMessageHandler = internalMessageHandler;
}
@Override
+ protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
+ {
+ if ("simple".equals(gateway.getWebAuthType())) {
+ Cookie[] cookies = request.getCookies();
+ if (cookies != null) {
+ for (Cookie cookie : cookies) {
+ if ("session".equals(cookie.getName())) {
+ try {
+ gateway.getAuthDatabase().authenticateSession(cookie.getValue());
+ }
+ catch (AuthenticationException ex) {
+ throw new WebApplicationException(ex, Status.FORBIDDEN);
+ }
+ super.service(request, response);
+ }
+ }
+ }
+ throw new WebApplicationException(Status.FORBIDDEN);
+ }
+ else {
+ super.service(request, response);
+ }
+ }
+
+ @Override
public WebSocket doWebSocketConnect(HttpServletRequest hsr, String protocol)
{
return new PubSubWebSocket();
[03/11] incubator-apex-core git commit: SPOI-1770 exposing list of
latest 100 topics
Posted by th...@apache.org.
SPOI-1770 exposing list of latest 100 topics
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/4e5c9e56
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/4e5c9e56
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/4e5c9e56
Branch: refs/heads/devel-3
Commit: 4e5c9e561f34aed94f6f205f8b13cd290419f5a9
Parents: 0e3a272
Author: David Yan <da...@datatorrent.com>
Authored: Thu Jan 16 18:02:44 2014 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 10:56:56 2015 -0700
----------------------------------------------------------------------
PubSubWebSocketServlet.java | 22 +++++++++++++++++++---
1 file changed, 19 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4e5c9e56/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
index cd7a71f..21bfd73 100644
--- a/PubSubWebSocketServlet.java
+++ b/PubSubWebSocketServlet.java
@@ -8,11 +8,10 @@ import com.datatorrent.api.util.JacksonObjectMapperProvider;
import com.datatorrent.api.util.PubSubMessage;
import com.datatorrent.api.util.PubSubMessage.PubSubMessageType;
import com.datatorrent.api.util.PubSubMessageCodec;
+import com.datatorrent.stram.util.LRUCache;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
+import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import javax.servlet.http.HttpServletRequest;
@@ -37,6 +36,17 @@ public class PubSubWebSocketServlet extends WebSocketServlet
private ObjectMapper mapper = (new JacksonObjectMapperProvider()).getContext(null);
private PubSubMessageCodec<Object> codec = new PubSubMessageCodec<Object>(mapper);
private InternalMessageHandler internalMessageHandler = null;
+ private static final int latestTopicCount = 100;
+ private LRUCache<String, Long> latestTopics = new LRUCache<String, Long>(latestTopicCount, false)
+ {
+ @Override
+ public Long put(String key, Long value)
+ {
+ remove(key); // this is to make the key the most recently inserted entry
+ return super.put(key, value);
+ }
+
+ };
public interface InternalMessageHandler
{
@@ -153,6 +163,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet
public synchronized void publish(String topic, Object data)
{
+ latestTopics.put(topic, System.currentTimeMillis());
HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
if (wsSet != null) {
Iterator<PubSubWebSocket> it = wsSet.iterator();
@@ -220,6 +231,11 @@ public class PubSubWebSocketServlet extends WebSocketServlet
unsubscribe(this, topic + ".numSubscribers");
}
}
+ else if (type.equals(PubSubMessageType.GET_LATEST_TOPICS)) {
+ synchronized (this) {
+ sendData(this, "_latestTopics", latestTopics.keySet());
+ }
+ }
}
}
}
[04/11] incubator-apex-core git commit: change name and jump version
Posted by th...@apache.org.
change name and jump version
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/0e3a2728
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/0e3a2728
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/0e3a2728
Branch: refs/heads/devel-3
Commit: 0e3a2728aaab37fc1ce938efef33649899f166a3
Parents: a3e861e
Author: David Yan <da...@datatorrent.com>
Authored: Tue Oct 15 20:00:51 2013 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 10:56:56 2015 -0700
----------------------------------------------------------------------
PubSubWebSocketServlet.java | 284 +++++++++++++++++++++++++++++++++++++++
1 file changed, 284 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0e3a2728/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
new file mode 100644
index 0000000..cd7a71f
--- /dev/null
+++ b/PubSubWebSocketServlet.java
@@ -0,0 +1,284 @@
+/*
+ * Copyright (c) 2012-2013 DataTorrent, Inc.
+ * All Rights Reserved.
+ */
+package com.datatorrent.gateway;
+
+import com.datatorrent.api.util.JacksonObjectMapperProvider;
+import com.datatorrent.api.util.PubSubMessage;
+import com.datatorrent.api.util.PubSubMessage.PubSubMessageType;
+import com.datatorrent.api.util.PubSubMessageCodec;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import javax.servlet.http.HttpServletRequest;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.eclipse.jetty.websocket.WebSocket;
+import org.eclipse.jetty.websocket.WebSocketServlet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>PubSubWebSocketServlet class.</p>
+ *
+ * @author David Yan <da...@datatorrent.com>
+ * @since 0.3.2
+ */
+public class PubSubWebSocketServlet extends WebSocketServlet
+{
+ private static final Logger LOG = LoggerFactory.getLogger(PubSubWebSocketServlet.class);
+ private static final long serialVersionUID = 1L;
+ private HashMap<String, HashSet<PubSubWebSocket>> topicToSocketMap = new HashMap<String, HashSet<PubSubWebSocket>>();
+ private HashMap<PubSubWebSocket, HashSet<String>> socketToTopicMap = new HashMap<PubSubWebSocket, HashSet<String>>();
+ private ObjectMapper mapper = (new JacksonObjectMapperProvider()).getContext(null);
+ private PubSubMessageCodec<Object> codec = new PubSubMessageCodec<Object>(mapper);
+ private InternalMessageHandler internalMessageHandler = null;
+
+ public interface InternalMessageHandler
+ {
+ void onMessage(String topic, Object data);
+
+ }
+
+ /*
+ private int timeout;
+
+ public void setTimeout(int timeout) {
+ this.timeout = timeout;
+ }
+ */
+
+ /*
+ private int timeout;
+ public void setTimeout(int timeout) {
+ this.timeout = timeout;
+ }
+ */
+ public void setInternalMessageHandler(InternalMessageHandler internalMessageHandler)
+ {
+ this.internalMessageHandler = internalMessageHandler;
+ }
+
+ @Override
+ public WebSocket doWebSocketConnect(HttpServletRequest hsr, String protocol)
+ {
+ return new PubSubWebSocket();
+ }
+
+ private synchronized void subscribe(PubSubWebSocket webSocket, String topic)
+ {
+ HashSet<PubSubWebSocket> wsSet;
+ if (!topicToSocketMap.containsKey(topic)) {
+ wsSet = new HashSet<PubSubWebSocket>();
+ topicToSocketMap.put(topic, wsSet);
+ }
+ else {
+ wsSet = topicToSocketMap.get(topic);
+ }
+ wsSet.add(webSocket);
+
+ HashSet<String> topicSet;
+ if (!socketToTopicMap.containsKey(webSocket)) {
+ topicSet = new HashSet<String>(0);
+ socketToTopicMap.put(webSocket, topicSet);
+ }
+ else {
+ topicSet = socketToTopicMap.get(webSocket);
+ }
+ topicSet.add(topic);
+ publish(topic + ".numSubscribers", new Integer(getNumSubscribers(topic)));
+ }
+
+ private synchronized void unsubscribe(PubSubWebSocket webSocket, String topic)
+ {
+ if (!topicToSocketMap.containsKey(topic)) {
+ return;
+ }
+ HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
+ wsSet.remove(webSocket);
+ if (wsSet.isEmpty()) {
+ topicToSocketMap.remove(topic);
+ }
+ if (!socketToTopicMap.containsKey(webSocket)) {
+ return;
+ }
+ HashSet<String> topicSet = socketToTopicMap.get(webSocket);
+ topicSet.remove(topic);
+ if (topicSet.isEmpty()) {
+ socketToTopicMap.remove(webSocket);
+ }
+ publish(topic + ".numSubscribers", new Integer(getNumSubscribers(topic)));
+ }
+
+ private synchronized void unsubscribeAll(PubSubWebSocket webSocket)
+ {
+ HashSet<String> topicSet = socketToTopicMap.get(webSocket);
+ if (topicSet != null) {
+ for (String topic : topicSet) {
+ HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
+ wsSet.remove(webSocket);
+ if (wsSet.isEmpty()) {
+ topicToSocketMap.remove(topic);
+ }
+ publish(topic + ".numSubscribers", new Integer(getNumSubscribers(topic)));
+ }
+ socketToTopicMap.remove(webSocket);
+ }
+ }
+
+ private synchronized void disconnect(PubSubWebSocket webSocket)
+ {
+ unsubscribeAll(webSocket);
+ }
+
+ public synchronized int getNumSubscribers(String topic)
+ {
+ HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
+ return wsSet == null ? 0 : wsSet.size();
+ }
+
+ private synchronized void sendData(PubSubWebSocket webSocket, String topic, Object data) throws IOException
+ {
+ PubSubMessage<Object> pubSubMessage = new PubSubMessage<Object>();
+ pubSubMessage.setType(PubSubMessageType.DATA);
+ pubSubMessage.setTopic(topic);
+ pubSubMessage.setData(data);
+ LOG.debug("Sending data of {} to subscriber...", topic);
+ webSocket.sendMessage(codec.formatMessage(pubSubMessage));
+ }
+
+ public synchronized void publish(String topic, Object data)
+ {
+ HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
+ if (wsSet != null) {
+ Iterator<PubSubWebSocket> it = wsSet.iterator();
+ while (it.hasNext()) {
+ PubSubWebSocket socket = it.next();
+ try {
+ sendData(socket, topic, data);
+ }
+ catch (Exception ex) {
+ it.remove();
+ disconnect(socket);
+ }
+ }
+ }
+ }
+
+ private class PubSubWebSocket implements WebSocket.OnTextMessage
+ {
+ private Connection connection;
+ private BlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(32);
+ private Thread messengerThread = new Thread(new Messenger());
+
+ @Override
+ public void onMessage(String message)
+ {
+ LOG.debug("Received message {}", message);
+ try {
+ @SuppressWarnings("unchecked")
+ PubSubMessage<Object> pubSubMessage = codec.parseMessage(message);
+ if (pubSubMessage != null) {
+ PubSubMessageType type = pubSubMessage.getType();
+ String topic = pubSubMessage.getTopic();
+ if (type != null) {
+ if (type.equals(PubSubMessageType.SUBSCRIBE)) {
+ if (topic != null) {
+ subscribe(this, topic);
+ }
+ }
+ else if (type.equals(PubSubMessageType.UNSUBSCRIBE)) {
+ if (topic != null) {
+ unsubscribe(this, topic);
+ }
+ }
+ else if (type.equals(PubSubMessageType.PUBLISH)) {
+ if (topic != null) {
+ Object data = pubSubMessage.getData();
+ if (data != null) {
+ publish(topic, data);
+ }
+ if (topic.startsWith(PubSubMessage.INTERNAL_TOPIC_PREFIX + ".")) {
+ if (internalMessageHandler != null) {
+ internalMessageHandler.onMessage(topic, data);
+ }
+ }
+ }
+ }
+ else if (type.equals(PubSubMessageType.SUBSCRIBE_NUM_SUBSCRIBERS)) {
+ if (topic != null) {
+ subscribe(this, topic + ".numSubscribers");
+ sendData(this, topic + ".numSubscribers", new Integer(getNumSubscribers(topic)));
+ }
+ }
+ else if (type.equals(PubSubMessageType.UNSUBSCRIBE_NUM_SUBSCRIBERS)) {
+ if (topic != null) {
+ unsubscribe(this, topic + ".numSubscribers");
+ }
+ }
+ }
+ }
+ }
+ catch (Exception ex) {
+ LOG.warn("Exception caught", ex);
+ }
+ }
+
+ @Override
+ public void onOpen(Connection connection)
+ {
+ LOG.debug("onOpen");
+ this.connection = connection;
+ this.connection.setMaxIdleTime(60 * 60 * 1000); // idle time set to one hour to clear out idle connections from taking resources
+ messengerThread.start();
+ }
+
+ @Override
+ public void onClose(int i, String string)
+ {
+ LOG.debug("onClose");
+ disconnect(this);
+ messengerThread.interrupt();
+ }
+
+ public void sendMessage(String message) throws IllegalStateException
+ {
+ messageQueue.add(message);
+ }
+
+ /*
+ * This class exists only because Jetty 8 does not support async write for websocket
+ *
+ */
+ private class Messenger implements Runnable
+ {
+ @Override
+ public void run()
+ {
+ while (!Thread.interrupted()) {
+ try {
+ String message = messageQueue.take();
+ // This call sendMessage() is blocking. This is why we have this messenger thread per connection so that one bad connection will not affect another
+ // Jetty 9 has async calls but we can't use Jetty 9 because it requires Java 7
+ // When we can use Java 7, we need to upgrade to Jetty 9.
+ connection.sendMessage(message);
+ }
+ catch (InterruptedException ex) {
+ return;
+ }
+ catch (Exception ex) {
+ LOG.error("Caught exception in websocket messenger.", ex);
+ return;
+ }
+ }
+ }
+
+ }
+
+ }
+
+}
[10/11] incubator-apex-core git commit: APEX-26 Generalized
PubSubWebsocketServlet for use in Apex and dependent projects.
Posted by th...@apache.org.
APEX-26 Generalized PubSubWebsocketServlet for use in Apex and dependent projects.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/aceaeebe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/aceaeebe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/aceaeebe
Branch: refs/heads/devel-3
Commit: aceaeebe2eda27337bc2dc38f71c11156f983bdb
Parents: 7488444
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Mon Aug 3 10:29:22 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 14:19:42 2015 -0700
----------------------------------------------------------------------
PubSubWebSocketServlet.java | 437 -------------------
engine/pom.xml | 1 -
.../stram/util/PubSubWebSocketServlet.java | 376 ++++++++++++++++
3 files changed, 376 insertions(+), 438 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/aceaeebe/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
deleted file mode 100644
index b039f44..0000000
--- a/PubSubWebSocketServlet.java
+++ /dev/null
@@ -1,437 +0,0 @@
-/*
- * Copyright (c) 2012-2013 DataTorrent, Inc.
- * All Rights Reserved.
- */
-package com.datatorrent.gateway;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-import javax.servlet.FilterChain;
-import javax.servlet.ServletException;
-import javax.servlet.ServletRequest;
-import javax.servlet.ServletResponse;
-import javax.servlet.http.Cookie;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.codehaus.jackson.map.ObjectMapper;
-import org.eclipse.jetty.websocket.WebSocket;
-import org.eclipse.jetty.websocket.WebSocketServlet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.common.util.PubSubMessage;
-import com.datatorrent.common.util.PubSubMessage.PubSubMessageType;
-import com.datatorrent.common.util.PubSubMessageCodec;
-
-import com.datatorrent.gateway.security.AuthDatabase;
-import com.datatorrent.gateway.security.AuthenticationException;
-import com.datatorrent.gateway.security.DTPrincipal;
-import com.datatorrent.stram.util.JSONSerializationProvider;
-import com.datatorrent.stram.util.LRUCache;
-
-
-/**
- * <p>PubSubWebSocketServlet class.</p>
- *
- * @author David Yan <da...@datatorrent.com>
- * @since 0.3.2
- */
-public class PubSubWebSocketServlet extends WebSocketServlet
-{
- private static final Logger LOG = LoggerFactory.getLogger(PubSubWebSocketServlet.class);
- private static final long serialVersionUID = 1L;
- private HashMap<String, HashSet<PubSubWebSocket>> topicToSocketMap = new HashMap<String, HashSet<PubSubWebSocket>>();
- private HashMap<PubSubWebSocket, HashSet<String>> socketToTopicMap = new HashMap<PubSubWebSocket, HashSet<String>>();
- private ObjectMapper mapper = (new JSONSerializationProvider()).getContext(null);
- private PubSubMessageCodec<Object> codec = new PubSubMessageCodec<Object>(mapper);
- private InternalMessageHandler internalMessageHandler = null;
- private static final int latestTopicCount = 100;
- private final DTGateway gateway;
- private static final String AUTH_ATTRIBUTE = "com.datatorrent.auth.principal";
- private SubscribeFilter subscribeFilter;
- private SendFilter sendFilter;
- private final LRUCache<String, Long> latestTopics = new LRUCache<String, Long>(latestTopicCount, false)
- {
- private static final long serialVersionUID = 20140131L;
-
- @Override
- public Long put(String key, Long value)
- {
- remove(key); // this is to make the key the most recently inserted entry
- return super.put(key, value);
- }
-
- };
-
- public interface SubscribeFilter
- {
-
- /**
- * Returns whether or not the principal is allowed to subscribe to this topic
- *
- * @param gateway
- * @param principal
- * @param topic
- * @return
- */
- boolean filter(DTGateway gateway, DTPrincipal principal, String topic);
- }
-
- public interface SendFilter
- {
-
- /**
- * Returns the data it should be sent given the principal
- *
- * @param gateway
- * @param principal
- * @param topic
- * @param data
- * @return the data it should send to the websocket
- */
- Object filter(DTGateway gateway, DTPrincipal principal, String topic, Object data);
- }
-
- public void registerSubscribeFilter(SubscribeFilter filter)
- {
- subscribeFilter = filter;
- }
-
- public void registerSendFilter(SendFilter filter)
- {
- sendFilter = filter;
- }
-
- public interface InternalMessageHandler
- {
- void onMessage(String topic, Object data);
-
- }
-
- public PubSubWebSocketServlet(DTGateway gateway)
- {
- this.gateway = gateway;
- }
-
- public void setInternalMessageHandler(InternalMessageHandler internalMessageHandler)
- {
- this.internalMessageHandler = internalMessageHandler;
- }
-
- @Override
- protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
- {
- boolean handled = false;
- DTPrincipal principal = null;
- AuthDatabase auth = gateway.getAuthDatabase();
- if (gateway.isDTSessionHandled()) {
- //if (DTGateway.WEB_AUTH_TYPE_PASSWORD.equals(gateway.getWebAuthType())) {
- Cookie[] cookies = request.getCookies();
- if (cookies != null) {
- for (Cookie cookie : cookies) {
- if ("session".equals(cookie.getName())) {
- try {
- principal = auth.authenticateSession(cookie.getValue());
- //request.setAttribute(AUTH_ATTRIBUTE, principal);
- } catch (AuthenticationException ex) {
- /* commenting this out to allow anonymous publish from stram
- throw new WebApplicationException(ex, Status.FORBIDDEN);
- */
- }
- //super.service(request, response);
- }
- }
- }
- /* commenting this out to allow anonymous publish from stram
- throw new WebApplicationException(Status.UNAUTHORIZED);
- */
- //}
- } else if (gateway.isHadoopAuthFilterHandled()) {
- final UserHolder userHolder = new UserHolder();
- gateway.getHadoopAuthFilter().doFilter(request, response, new FilterChain()
- {
- @Override
- public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse) throws IOException, ServletException
- {
- userHolder.username = ((HttpServletRequest)servletRequest).getUserPrincipal().getName();
- }
- });
- if (response.getStatus() == HttpServletResponse.SC_OK) {
- principal = auth.getUser(userHolder.username);
- } else {
- handled = true;
- }
- }
- if (!handled) {
- if (principal != null) {
- request.setAttribute(AUTH_ATTRIBUTE, principal);
- }
- super.service(request, response);
- }
- }
-
- private class UserHolder {
- public String username;
- }
-
- @Override
- public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
- {
- DTPrincipal principal = (DTPrincipal)request.getAttribute(AUTH_ATTRIBUTE);
- return new PubSubWebSocket(principal);
- }
-
- private synchronized void subscribe(PubSubWebSocket webSocket, String topic)
- {
- if (subscribeFilter != null && !subscribeFilter.filter(gateway, webSocket.getPrincipal(), topic)) {
- LOG.warn("Subscribe filter returns false for topic {}, user {}. Ignoring subscribe request", topic, webSocket.getPrincipal());
- return;
- }
- else {
- LOG.debug("Subscribe is allowed for topic {}, user {}", topic, webSocket.getPrincipal());
- }
-
- HashSet<PubSubWebSocket> wsSet;
- if (!topicToSocketMap.containsKey(topic)) {
- wsSet = new HashSet<PubSubWebSocket>();
- topicToSocketMap.put(topic, wsSet);
- }
- else {
- wsSet = topicToSocketMap.get(topic);
- }
- wsSet.add(webSocket);
-
- HashSet<String> topicSet;
- if (!socketToTopicMap.containsKey(webSocket)) {
- topicSet = new HashSet<String>(0);
- socketToTopicMap.put(webSocket, topicSet);
- }
- else {
- topicSet = socketToTopicMap.get(webSocket);
- }
- topicSet.add(topic);
- publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
- }
-
- private synchronized void unsubscribe(PubSubWebSocket webSocket, String topic)
- {
- if (!topicToSocketMap.containsKey(topic)) {
- return;
- }
- HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
- wsSet.remove(webSocket);
- if (wsSet.isEmpty()) {
- topicToSocketMap.remove(topic);
- }
- if (!socketToTopicMap.containsKey(webSocket)) {
- return;
- }
- HashSet<String> topicSet = socketToTopicMap.get(webSocket);
- topicSet.remove(topic);
- if (topicSet.isEmpty()) {
- socketToTopicMap.remove(webSocket);
- }
- publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
- }
-
- private synchronized void unsubscribeAll(PubSubWebSocket webSocket)
- {
- HashSet<String> topicSet = socketToTopicMap.get(webSocket);
- if (topicSet != null) {
- for (String topic : topicSet) {
- HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
- wsSet.remove(webSocket);
- if (wsSet.isEmpty()) {
- topicToSocketMap.remove(topic);
- }
- publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
- }
- socketToTopicMap.remove(webSocket);
- }
- }
-
- private synchronized void disconnect(PubSubWebSocket webSocket)
- {
- unsubscribeAll(webSocket);
- }
-
- public synchronized int getNumSubscribers(String topic)
- {
- HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
- return wsSet == null ? 0 : wsSet.size();
- }
-
- private synchronized void sendData(PubSubWebSocket webSocket, String topic, Object data) throws IOException
- {
- PubSubMessage<Object> pubSubMessage = new PubSubMessage<Object>();
- pubSubMessage.setType(PubSubMessageType.DATA);
- pubSubMessage.setTopic(topic);
- pubSubMessage.setData(data);
- LOG.debug("Sending data {} to subscriber...", topic);
- webSocket.sendMessage(codec.formatMessage(pubSubMessage));
- }
-
- public synchronized void publish(String topic, Object data)
- {
- if (!topic.endsWith("." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX) && !topic.startsWith(PubSubMessage.INTERNAL_TOPIC_PREFIX + ".")) {
- latestTopics.put(topic, System.currentTimeMillis());
- }
- HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
- if (wsSet != null) {
- Iterator<PubSubWebSocket> it = wsSet.iterator();
- while (it.hasNext()) {
- PubSubWebSocket socket = it.next();
- try {
- if (sendFilter != null) {
- Object filteredData = sendFilter.filter(gateway, socket.getPrincipal(), topic, data);
- sendData(socket, topic, filteredData);
- }
- else {
- sendData(socket, topic, data);
- }
- }
- catch (Exception ex) {
- LOG.error("Cannot send message", ex);
- it.remove();
- disconnect(socket);
- }
- }
- }
- }
-
- private class PubSubWebSocket implements WebSocket.OnTextMessage
- {
- private Connection connection;
- private final BlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(1024);
- private final Thread messengerThread = new Thread(new Messenger());
- private final DTPrincipal principal;
-
- public PubSubWebSocket(DTPrincipal principal)
- {
- this.principal = principal;
- }
-
- public DTPrincipal getPrincipal()
- {
- return principal;
- }
-
- @Override
- public void onMessage(String message)
- {
- LOG.debug("Received message {}", message);
- try {
- @SuppressWarnings("unchecked")
- PubSubMessage<Object> pubSubMessage = codec.parseMessage(message);
- if (pubSubMessage != null) {
- PubSubMessageType type = pubSubMessage.getType();
- String topic = pubSubMessage.getTopic();
- if (type != null) {
- if (type.equals(PubSubMessageType.SUBSCRIBE)) {
- if (topic != null) {
- subscribe(this, topic);
- }
- }
- else if (type.equals(PubSubMessageType.UNSUBSCRIBE)) {
- if (topic != null) {
- unsubscribe(this, topic);
- }
- }
- else if (type.equals(PubSubMessageType.PUBLISH)) {
- if (topic != null) {
- Object data = pubSubMessage.getData();
- if (data != null) {
- publish(topic, data);
- }
- if (topic.startsWith(PubSubMessage.INTERNAL_TOPIC_PREFIX + ".")) {
- if (internalMessageHandler != null) {
- internalMessageHandler.onMessage(topic, data);
- }
- }
- }
- }
- else if (type.equals(PubSubMessageType.SUBSCRIBE_NUM_SUBSCRIBERS)) {
- if (topic != null) {
- subscribe(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX);
- sendData(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
- }
- }
- else if (type.equals(PubSubMessageType.UNSUBSCRIBE_NUM_SUBSCRIBERS)) {
- if (topic != null) {
- unsubscribe(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX);
- }
- }
- else if (type.equals(PubSubMessageType.GET_LATEST_TOPICS)) {
- synchronized (this) {
- sendData(this, "_latestTopics", latestTopics.keySet());
- }
- }
- }
- }
- }
- catch (Exception ex) {
- LOG.warn("Exception caught", ex);
- }
- }
-
- @Override
- public void onOpen(Connection connection)
- {
- LOG.debug("onOpen");
- this.connection = connection;
- this.connection.setMaxIdleTime(5 * 60 * 1000); // idle time set to five minute to clear out idle connections from taking resources
- this.connection.setMaxTextMessageSize(8 * 1024 * 1024); // allow larger text message
- messengerThread.start();
- }
-
- @Override
- public void onClose(int i, String string)
- {
- LOG.debug("onClose");
- disconnect(this);
- messengerThread.interrupt();
- }
-
- public void sendMessage(String message) throws IllegalStateException
- {
- messageQueue.add(message);
- }
-
- /*
- * This class exists only because Jetty 8 does not support async write for websocket
- *
- */
- private class Messenger implements Runnable
- {
- @Override
- public void run()
- {
- while (!Thread.interrupted()) {
- try {
- String message = messageQueue.take();
- // This call sendMessage() is blocking. This is why we have this messenger thread per connection so that one bad connection will not affect another
- // Jetty 9 has async calls but we can't use Jetty 9 because it requires Java 7
- // When we can use Java 7, we need to upgrade to Jetty 9.
- connection.sendMessage(message);
- }
- catch (InterruptedException ex) {
- return;
- }
- catch (Exception ex) {
- LOG.error("Caught exception in websocket messenger.", ex);
- return;
- }
- }
- }
-
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/aceaeebe/engine/pom.xml
----------------------------------------------------------------------
diff --git a/engine/pom.xml b/engine/pom.xml
index 21eaa00..2b8f66a 100644
--- a/engine/pom.xml
+++ b/engine/pom.xml
@@ -272,7 +272,6 @@
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-websocket</artifactId>
<version>${jetty.version}</version>
- <scope>test</scope>
</dependency>
<!-- use shaded asm library to avoid conflict -->
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/aceaeebe/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketServlet.java b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketServlet.java
new file mode 100644
index 0000000..d8c3df8
--- /dev/null
+++ b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketServlet.java
@@ -0,0 +1,376 @@
+/*
+ * Copyright (c) 2012-2013 DataTorrent, Inc.
+ * All Rights Reserved.
+ */
+package com.datatorrent.stram.util;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import javax.servlet.http.HttpServletRequest;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.eclipse.jetty.websocket.WebSocket;
+import org.eclipse.jetty.websocket.WebSocket.Connection;
+import org.eclipse.jetty.websocket.WebSocketServlet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.common.util.PubSubMessage;
+import com.datatorrent.common.util.PubSubMessage.PubSubMessageType;
+import com.datatorrent.common.util.PubSubMessageCodec;
+
+
+/**
+ * <p>PubSubWebSocketServlet class.</p>
+ *
+ * @author David Yan <da...@datatorrent.com>
+ * @since 0.3.2
+ */
+public class PubSubWebSocketServlet<SECURITY_CONTEXT, PRINCIPAL> extends WebSocketServlet
+{
+ private static final Logger LOG = LoggerFactory.getLogger(PubSubWebSocketServlet.class);
+ private static final long serialVersionUID = 1L;
+ private HashMap<String, HashSet<PubSubWebSocket>> topicToSocketMap = new HashMap<String, HashSet<PubSubWebSocket>>();
+ private HashMap<PubSubWebSocket, HashSet<String>> socketToTopicMap = new HashMap<PubSubWebSocket, HashSet<String>>();
+ private ObjectMapper mapper = (new JSONSerializationProvider()).getContext(null);
+ private PubSubMessageCodec<Object> codec = new PubSubMessageCodec<Object>(mapper);
+ private InternalMessageHandler internalMessageHandler = null;
+ private static final int latestTopicCount = 100;
+ protected SECURITY_CONTEXT securityContext;
+ private SubscribeFilter subscribeFilter;
+ private SendFilter sendFilter;
+ private String authAttribute;
+ private final LRUCache<String, Long> latestTopics = new LRUCache<String, Long>(latestTopicCount, false)
+ {
+ private static final long serialVersionUID = 20140131L;
+
+ @Override
+ public Long put(String key, Long value)
+ {
+ remove(key); // this is to make the key the most recently inserted entry
+ return super.put(key, value);
+ }
+
+ };
+
+ public interface SubscribeFilter<SECURITY_CONTEXT, PRINCIPAL>
+ {
+
+ /**
+ * Returns whether or not the principal is allowed to subscribe to this topic
+ *
+ * @param securityContext
+ * @param principal
+ * @param topic
+ * @return
+ */
+ boolean filter(SECURITY_CONTEXT securityContext, PRINCIPAL principal, String topic);
+ }
+
+ public interface SendFilter<SECURITY_CONTEXT, PRINCIPAL>
+ {
+
+ /**
+ * Returns the data it should be sent given the principal
+ *
+ * @param securityContext
+ * @param principal
+ * @param topic
+ * @param data
+ * @return the data it should send to the websocket
+ */
+ Object filter(SECURITY_CONTEXT securityContext, PRINCIPAL principal, String topic, Object data);
+ }
+
+ public void registerSubscribeFilter(SubscribeFilter filter)
+ {
+ subscribeFilter = filter;
+ }
+
+ public void registerSendFilter(SendFilter filter)
+ {
+ sendFilter = filter;
+ }
+
+ public interface InternalMessageHandler
+ {
+ void onMessage(String topic, Object data);
+
+ }
+
+ public PubSubWebSocketServlet(SECURITY_CONTEXT securityContext, String authAttribute)
+ {
+ this.securityContext = securityContext;
+ this.authAttribute = authAttribute;
+ }
+
+ public void setInternalMessageHandler(InternalMessageHandler internalMessageHandler)
+ {
+ this.internalMessageHandler = internalMessageHandler;
+ }
+
+ public class UserHolder {
+ public String username;
+ }
+
+ @Override
+ public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
+ {
+ @SuppressWarnings("unchecked")
+ PRINCIPAL principal = (PRINCIPAL) request.getAttribute(authAttribute);
+ return new PubSubWebSocket(principal);
+ }
+
+ private synchronized void subscribe(PubSubWebSocket webSocket, String topic)
+ {
+ if (subscribeFilter != null && !subscribeFilter.filter(securityContext, webSocket.getPrincipal(), topic)) {
+ LOG.warn("Subscribe filter returns false for topic {}, user {}. Ignoring subscribe request", topic, webSocket.getPrincipal());
+ return;
+ }
+ else {
+ LOG.debug("Subscribe is allowed for topic {}, user {}", topic, webSocket.getPrincipal());
+ }
+
+ HashSet<PubSubWebSocket> wsSet;
+ if (!topicToSocketMap.containsKey(topic)) {
+ wsSet = new HashSet<PubSubWebSocket>();
+ topicToSocketMap.put(topic, wsSet);
+ }
+ else {
+ wsSet = topicToSocketMap.get(topic);
+ }
+ wsSet.add(webSocket);
+
+ HashSet<String> topicSet;
+ if (!socketToTopicMap.containsKey(webSocket)) {
+ topicSet = new HashSet<String>(0);
+ socketToTopicMap.put(webSocket, topicSet);
+ }
+ else {
+ topicSet = socketToTopicMap.get(webSocket);
+ }
+ topicSet.add(topic);
+ publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
+ }
+
+ private synchronized void unsubscribe(PubSubWebSocket webSocket, String topic)
+ {
+ if (!topicToSocketMap.containsKey(topic)) {
+ return;
+ }
+ HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
+ wsSet.remove(webSocket);
+ if (wsSet.isEmpty()) {
+ topicToSocketMap.remove(topic);
+ }
+ if (!socketToTopicMap.containsKey(webSocket)) {
+ return;
+ }
+ HashSet<String> topicSet = socketToTopicMap.get(webSocket);
+ topicSet.remove(topic);
+ if (topicSet.isEmpty()) {
+ socketToTopicMap.remove(webSocket);
+ }
+ publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
+ }
+
+ private synchronized void unsubscribeAll(PubSubWebSocket webSocket)
+ {
+ HashSet<String> topicSet = socketToTopicMap.get(webSocket);
+ if (topicSet != null) {
+ for (String topic : topicSet) {
+ HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
+ wsSet.remove(webSocket);
+ if (wsSet.isEmpty()) {
+ topicToSocketMap.remove(topic);
+ }
+ publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
+ }
+ socketToTopicMap.remove(webSocket);
+ }
+ }
+
+ private synchronized void disconnect(PubSubWebSocket webSocket)
+ {
+ unsubscribeAll(webSocket);
+ }
+
+ public synchronized int getNumSubscribers(String topic)
+ {
+ HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
+ return wsSet == null ? 0 : wsSet.size();
+ }
+
+ private synchronized void sendData(PubSubWebSocket webSocket, String topic, Object data) throws IOException
+ {
+ PubSubMessage<Object> pubSubMessage = new PubSubMessage<Object>();
+ pubSubMessage.setType(PubSubMessageType.DATA);
+ pubSubMessage.setTopic(topic);
+ pubSubMessage.setData(data);
+ LOG.debug("Sending data {} to subscriber...", topic);
+ webSocket.sendMessage(codec.formatMessage(pubSubMessage));
+ }
+
+ public synchronized void publish(String topic, Object data)
+ {
+ if (!topic.endsWith("." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX) && !topic.startsWith(PubSubMessage.INTERNAL_TOPIC_PREFIX + ".")) {
+ latestTopics.put(topic, System.currentTimeMillis());
+ }
+ HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
+ if (wsSet != null) {
+ Iterator<PubSubWebSocket> it = wsSet.iterator();
+ while (it.hasNext()) {
+ PubSubWebSocket socket = it.next();
+ try {
+ if (sendFilter != null) {
+ Object filteredData = sendFilter.filter(securityContext, socket.getPrincipal(), topic, data);
+ sendData(socket, topic, filteredData);
+ }
+ else {
+ sendData(socket, topic, data);
+ }
+ }
+ catch (Exception ex) {
+ LOG.error("Cannot send message", ex);
+ it.remove();
+ disconnect(socket);
+ }
+ }
+ }
+ }
+
+ protected class PubSubWebSocket implements WebSocket.OnTextMessage
+ {
+ private Connection connection;
+ private final BlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(1024);
+ private final Thread messengerThread = new Thread(new Messenger());
+ private final PRINCIPAL principal;
+
+ public PubSubWebSocket(PRINCIPAL principal)
+ {
+ this.principal = principal;
+ }
+
+ public PRINCIPAL getPrincipal()
+ {
+ return principal;
+ }
+
+ @Override
+ public void onMessage(String message)
+ {
+ LOG.debug("Received message {}", message);
+ try {
+ @SuppressWarnings("unchecked")
+ PubSubMessage<Object> pubSubMessage = codec.parseMessage(message);
+ if (pubSubMessage != null) {
+ PubSubMessageType type = pubSubMessage.getType();
+ String topic = pubSubMessage.getTopic();
+ if (type != null) {
+ if (type.equals(PubSubMessageType.SUBSCRIBE)) {
+ if (topic != null) {
+ subscribe(this, topic);
+ }
+ }
+ else if (type.equals(PubSubMessageType.UNSUBSCRIBE)) {
+ if (topic != null) {
+ unsubscribe(this, topic);
+ }
+ }
+ else if (type.equals(PubSubMessageType.PUBLISH)) {
+ if (topic != null) {
+ Object data = pubSubMessage.getData();
+ if (data != null) {
+ publish(topic, data);
+ }
+ if (topic.startsWith(PubSubMessage.INTERNAL_TOPIC_PREFIX + ".")) {
+ if (internalMessageHandler != null) {
+ internalMessageHandler.onMessage(topic, data);
+ }
+ }
+ }
+ }
+ else if (type.equals(PubSubMessageType.SUBSCRIBE_NUM_SUBSCRIBERS)) {
+ if (topic != null) {
+ subscribe(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX);
+ sendData(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
+ }
+ }
+ else if (type.equals(PubSubMessageType.UNSUBSCRIBE_NUM_SUBSCRIBERS)) {
+ if (topic != null) {
+ unsubscribe(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX);
+ }
+ }
+ else if (type.equals(PubSubMessageType.GET_LATEST_TOPICS)) {
+ synchronized (this) {
+ sendData(this, "_latestTopics", latestTopics.keySet());
+ }
+ }
+ }
+ }
+ }
+ catch (Exception ex) {
+ LOG.warn("Exception caught", ex);
+ }
+ }
+
+ @Override
+ public void onOpen(Connection connection)
+ {
+ LOG.debug("onOpen");
+ this.connection = connection;
+ this.connection.setMaxIdleTime(5 * 60 * 1000); // idle time set to five minute to clear out idle connections from taking resources
+ this.connection.setMaxTextMessageSize(8 * 1024 * 1024); // allow larger text message
+ messengerThread.start();
+ }
+
+ @Override
+ public void onClose(int i, String string)
+ {
+ LOG.debug("onClose");
+ disconnect(this);
+ messengerThread.interrupt();
+ }
+
+ public void sendMessage(String message) throws IllegalStateException
+ {
+ messageQueue.add(message);
+ }
+
+ /*
+ * This class exists only because Jetty 8 does not support async write for websocket
+ *
+ */
+ private class Messenger implements Runnable
+ {
+ @Override
+ public void run()
+ {
+ while (!Thread.interrupted()) {
+ try {
+ String message = messageQueue.take();
+ // This call sendMessage() is blocking. This is why we have this messenger thread per connection so that one bad connection will not affect another
+ // Jetty 9 has async calls but we can't use Jetty 9 because it requires Java 7
+ // When we can use Java 7, we need to upgrade to Jetty 9.
+ connection.sendMessage(message);
+ }
+ catch (InterruptedException ex) {
+ return;
+ }
+ catch (Exception ex) {
+ LOG.error("Caught exception in websocket messenger.", ex);
+ return;
+ }
+ }
+ }
+
+ }
+
+ }
+
+}
[11/11] incubator-apex-core git commit: APEX-39 Added store and
embeddable interfaces for app data which allow query operators to be embedded
in a store. Also added detection of a store with an embeddable query
operator.
Posted by th...@apache.org.
APEX-39 Added store and embeddable interfaces for app data which allow query operators to be embedded in a store. Also added detection of a store with an embeddable query operator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d748ed46
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d748ed46
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d748ed46
Branch: refs/heads/devel-3
Commit: d748ed46fb2ef91d74a70c2be52dc4a56bb4df71
Parents: aceaeeb
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Fri Aug 7 17:47:46 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 14:19:55 2015 -0700
----------------------------------------------------------------------
.../common/experimental/AppData.java | 53 +++++++++++++++++++-
.../stram/StreamingContainerManager.java | 31 ++++++++++--
2 files changed, 79 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d748ed46/common/src/main/java/com/datatorrent/common/experimental/AppData.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/experimental/AppData.java b/common/src/main/java/com/datatorrent/common/experimental/AppData.java
index fbdc82c..22259c5 100644
--- a/common/src/main/java/com/datatorrent/common/experimental/AppData.java
+++ b/common/src/main/java/com/datatorrent/common/experimental/AppData.java
@@ -15,13 +15,15 @@
*/
package com.datatorrent.common.experimental;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
-
import org.apache.hadoop.classification.InterfaceStability;
/**
@@ -33,6 +35,55 @@ import org.apache.hadoop.classification.InterfaceStability;
public interface AppData
{
/**
+ * This interface is for App Data stores which support embedding a query operator.
+ * @param <QUERY_TYPE> The type of the query tuple emitted by the embedded query operator.
+ */
+ interface Store<QUERY_TYPE> extends Operator.ActivationListener<OperatorContext>
+ {
+ /**
+ * Gets the query connector which is used by the store operator to receive queries. If this method returns
+ * null then this Store should have a separate query operator connected to it.
+ * @return The query connector which is used by the store operator to receive queries.
+ */
+ public EmbeddableQueryInfoProvider<QUERY_TYPE> getEmbeddableQueryInfoProvider();
+
+ /**
+ * Sets the query connector which is used by the store operator to receive queries. The store operator will call
+ * the {@link EmbeddableQueryInfoProvider#enableEmbeddedMode} method of the embeddable query operator before
+ * its {@link Operator#setup} method is called.
+ * @param embeddableQueryInfoProvider The query connector which is used by the store operator to receive queries.
+ */
+ public void setEmbeddableQueryInfoProvider(EmbeddableQueryInfoProvider<QUERY_TYPE> embeddableQueryInfoProvider);
+ }
+
+ /**
+ * This interface represents a query operator which can be embedded into an AppData data source. This operator could also
+ * be used as a standalone operator. The distinction between being used in a standalone or embedded context is made by
+ * the {@link EmbeddableQueryInfoProvider#enableEmbeddedMode} method. If this method is called at least once then the {@link EmbeddableQueryInfoProvider}
+ * will operate as if it were embedded in an {@link AppData.Store} operator. If this method is never called then the operator will behave as if
+ * it were a standalone operator.<br/><br/>
+ * <b>Note:</b> When an {@link EmbeddableQueryInfoProvider} is set on an {@link AppData.Store} then it's {@link EmbeddableQueryInfoProvider#enableEmbeddedMode}
+ * method is called before {@link Operator#setup}.
+ * @param <QUERY_TYPE> The type of the query emitted by the operator.
+ */
+ interface EmbeddableQueryInfoProvider<QUERY_TYPE> extends Operator, ConnectionInfoProvider, Operator.ActivationListener<OperatorContext>
+ {
+ /**
+ * Gets the output port for queries.
+ * @return The output port for queries.
+ */
+ public DefaultOutputPort<QUERY_TYPE> getOutputPort();
+
+ /**
+ * If this method is called at least once then this operator will work as if it were embedded in an {@link AppData.Store}.
+ * If this method is never called then this operator will behave as a standalone operator. When an {@link EmbeddableQueryInfoProvider}
+ * is set on an {@link AppData.Store} then the {@link AppData.Store} will call the {@link EmbeddableQueryInfoProvider#enableEmbeddedMode}
+ * method once before the {@link Operator.setup} is called.
+ */
+ public void enableEmbeddedMode();
+ }
+
+ /**
* This interface should be implemented by AppData Query and Result operators.
*/
interface ConnectionInfoProvider
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d748ed46/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 7002c1d..7944a4b 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -132,6 +132,7 @@ public class StreamingContainerManager implements PlanContext
public final static String APP_META_FILENAME = "meta.json";
public final static String APP_META_KEY_ATTRIBUTES = "attributes";
public final static String APP_META_KEY_METRICS = "metrics";
+ public static final String EMBEDDABLE_QUERY_NAME_SUFFIX = ".query";
public final static long LATENCY_WARNING_THRESHOLD_MILLIS = 10 * 60 * 1000; // 10 minutes
public final static Recoverable SET_OPERATOR_PROPERTY = new SetOperatorProperty();
@@ -567,6 +568,22 @@ public class StreamingContainerManager implements PlanContext
String queryUrl = null;
String queryTopic = null;
+ boolean hasEmbeddedQuery = false;
+
+ //Discover embeddable query connectors
+ if (operatorMeta.getOperator() instanceof AppData.Store<?>) {
+ AppData.Store<?> store = (AppData.Store<?>)operatorMeta.getOperator();
+ AppData.EmbeddableQueryInfoProvider<?> embeddableQuery = store.getEmbeddableQueryInfoProvider();
+
+ if (embeddableQuery != null) {
+ hasEmbeddedQuery = true;
+ queryOperatorName = operatorMeta.getName() + EMBEDDABLE_QUERY_NAME_SUFFIX;
+ queryUrl = embeddableQuery.getAppDataURL();
+ queryTopic = embeddableQuery.getTopic();
+ }
+ }
+
+ //Discover separate query operators
LOG.warn("DEBUG: looking at operator {} {}", operatorMeta.getName(), Thread.currentThread().getId());
for (Map.Entry<LogicalPlan.InputPortMeta, LogicalPlan.StreamMeta> entry : inputStreams.entrySet()) {
LogicalPlan.InputPortMeta portMeta = entry.getKey();
@@ -574,10 +591,16 @@ public class StreamingContainerManager implements PlanContext
if (queryUrl == null) {
OperatorMeta queryOperatorMeta = entry.getValue().getSource().getOperatorMeta();
if (queryOperatorMeta.getOperator() instanceof AppData.ConnectionInfoProvider) {
- AppData.ConnectionInfoProvider queryOperator = (AppData.ConnectionInfoProvider) queryOperatorMeta.getOperator();
- queryOperatorName = queryOperatorMeta.getName();
- queryUrl = queryOperator.getAppDataURL();
- queryTopic = queryOperator.getTopic();
+ if (!hasEmbeddedQuery) {
+ AppData.ConnectionInfoProvider queryOperator = (AppData.ConnectionInfoProvider)queryOperatorMeta.getOperator();
+ queryOperatorName = queryOperatorMeta.getName();
+ queryUrl = queryOperator.getAppDataURL();
+ queryTopic = queryOperator.getTopic();
+ } else {
+ LOG.warn("An embeddable query connector and the {} query operator were discovered. " +
+ "The query operator will be ignored and the embeddable query connector will be used instead.",
+ operatorMeta.getName());
+ }
}
} else {
LOG.warn("Multiple query ports found in operator {}. Ignoring the App Data Source.", operatorMeta.getName());
[06/11] incubator-apex-core git commit: SPOI-2695 #resolve renamed
simple authentication to password authentication
Posted by th...@apache.org.
SPOI-2695 #resolve renamed simple authentication to password authentication
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/26c05205
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/26c05205
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/26c05205
Branch: refs/heads/devel-3
Commit: 26c0520552b6d6100d9382ee310c175a44380681
Parents: 55844f8
Author: David Yan <da...@datatorrent.com>
Authored: Wed Aug 13 16:13:46 2014 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 10:56:57 2015 -0700
----------------------------------------------------------------------
PubSubWebSocketServlet.java | 108 +++++++++++++++++++++++++++++++++------
1 file changed, 91 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/26c05205/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
index 21b48f7..ba1ac9c 100644
--- a/PubSubWebSocketServlet.java
+++ b/PubSubWebSocketServlet.java
@@ -4,13 +4,16 @@
*/
package com.datatorrent.gateway;
+import com.datatorrent.gateway.security.AuthDatabase;
import com.datatorrent.gateway.security.AuthenticationException;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
import org.codehaus.jackson.map.ObjectMapper;
import org.eclipse.jetty.websocket.WebSocket;
@@ -18,17 +21,14 @@ import org.eclipse.jetty.websocket.WebSocketServlet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.gateway.security.DTPrincipal;
import com.datatorrent.lib.util.JacksonObjectMapperProvider;
import com.datatorrent.lib.util.PubSubMessage;
import com.datatorrent.lib.util.PubSubMessage.PubSubMessageType;
import com.datatorrent.lib.util.PubSubMessageCodec;
-
import com.datatorrent.stram.util.LRUCache;
-import javax.servlet.ServletException;
import javax.servlet.http.Cookie;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response.Status;
+
/**
* <p>PubSubWebSocketServlet class.</p>
@@ -47,6 +47,9 @@ public class PubSubWebSocketServlet extends WebSocketServlet
private InternalMessageHandler internalMessageHandler = null;
private static final int latestTopicCount = 100;
private final DTGateway gateway;
+ private static final String AUTH_ATTRIBUTE = "com.datatorrent.auth.principal";
+ private SubscribeFilter subscribeFilter;
+ private SendFilter sendFilter;
private final LRUCache<String, Long> latestTopics = new LRUCache<String, Long>(latestTopicCount, false)
{
private static final long serialVersionUID = 20140131L;
@@ -60,6 +63,45 @@ public class PubSubWebSocketServlet extends WebSocketServlet
};
+ public interface SubscribeFilter
+ {
+
+ /**
+ * Returns whether or not the principal is allowed to subscribe to this topic
+ *
+ * @param gateway
+ * @param principal
+ * @param topic
+ * @return
+ */
+ boolean filter(DTGateway gateway, DTPrincipal principal, String topic);
+ }
+
+ public interface SendFilter
+ {
+
+ /**
+ * Returns the data it should be sent given the principal
+ *
+ * @param gateway
+ * @param principal
+ * @param topic
+ * @param data
+ * @return the data it should send to the websocket
+ */
+ Object filter(DTGateway gateway, DTPrincipal principal, String topic, Object data);
+ }
+
+ public void registerSubscribeFilter(SubscribeFilter filter)
+ {
+ subscribeFilter = filter;
+ }
+
+ public void registerSendFilter(SendFilter filter)
+ {
+ sendFilter = filter;
+ }
+
public interface InternalMessageHandler
{
void onMessage(String topic, Object data);
@@ -79,36 +121,50 @@ public class PubSubWebSocketServlet extends WebSocketServlet
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
- if ("simple".equals(gateway.getWebAuthType())) {
+ if (DTGateway.WEB_AUTH_TYPE_PASSWORD.equals(gateway.getWebAuthType())) {
Cookie[] cookies = request.getCookies();
if (cookies != null) {
for (Cookie cookie : cookies) {
if ("session".equals(cookie.getName())) {
try {
- gateway.getAuthDatabase().authenticateSession(cookie.getValue());
+ AuthDatabase auth = gateway.getAuthDatabase();
+ DTPrincipal principal = auth.authenticateSession(cookie.getValue());
+ request.setAttribute(AUTH_ATTRIBUTE, principal);
}
catch (AuthenticationException ex) {
- throw new WebApplicationException(ex, Status.FORBIDDEN);
+ /* commenting this out to allow anonymous publish from stram
+ throw new WebApplicationException(ex, Status.FORBIDDEN);
+ */
}
- super.service(request, response);
+ //super.service(request, response);
}
}
}
- throw new WebApplicationException(Status.FORBIDDEN);
- }
- else {
- super.service(request, response);
+ /* commenting this out to allow anonymous publish from stram
+ throw new WebApplicationException(Status.UNAUTHORIZED);
+ */
}
+
+ super.service(request, response);
}
@Override
- public WebSocket doWebSocketConnect(HttpServletRequest hsr, String protocol)
+ public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
{
- return new PubSubWebSocket();
+ DTPrincipal principal = (DTPrincipal)request.getAttribute(AUTH_ATTRIBUTE);
+ return new PubSubWebSocket(principal);
}
private synchronized void subscribe(PubSubWebSocket webSocket, String topic)
{
+ if (subscribeFilter != null && !subscribeFilter.filter(gateway, webSocket.getPrincipal(), topic)) {
+ LOG.warn("Subscribe filter returns false for topic {}, user {}. Ignoring subscribe request", topic, webSocket.getPrincipal());
+ return;
+ }
+ else {
+ LOG.info("Subscribe is allowed for topic {}, user {}", topic, webSocket.getPrincipal());
+ }
+
HashSet<PubSubWebSocket> wsSet;
if (!topicToSocketMap.containsKey(topic)) {
wsSet = new HashSet<PubSubWebSocket>();
@@ -185,7 +241,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet
pubSubMessage.setType(PubSubMessageType.DATA);
pubSubMessage.setTopic(topic);
pubSubMessage.setData(data);
- LOG.debug("Sending data of {} to subscriber...", topic);
+ LOG.debug("Sending data {} to subscriber...", topic);
webSocket.sendMessage(codec.formatMessage(pubSubMessage));
}
@@ -200,9 +256,16 @@ public class PubSubWebSocketServlet extends WebSocketServlet
while (it.hasNext()) {
PubSubWebSocket socket = it.next();
try {
- sendData(socket, topic, data);
+ if (sendFilter != null) {
+ Object filteredData = sendFilter.filter(gateway, socket.getPrincipal(), topic, data);
+ sendData(socket, topic, filteredData);
+ }
+ else {
+ sendData(socket, topic, data);
+ }
}
catch (Exception ex) {
+ LOG.error("Cannot send message", ex);
it.remove();
disconnect(socket);
}
@@ -215,6 +278,17 @@ public class PubSubWebSocketServlet extends WebSocketServlet
private Connection connection;
private final BlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(32);
private final Thread messengerThread = new Thread(new Messenger());
+ private final DTPrincipal principal;
+
+ public PubSubWebSocket(DTPrincipal principal)
+ {
+ this.principal = principal;
+ }
+
+ public DTPrincipal getPrincipal()
+ {
+ return principal;
+ }
@Override
public void onMessage(String message)
[05/11] incubator-apex-core git commit: Handling JAAS and kerberos
auth for pubsub
Posted by th...@apache.org.
Handling JAAS and kerberos auth for pubsub
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/c0b1178b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/c0b1178b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/c0b1178b
Branch: refs/heads/devel-3
Commit: c0b1178b46c109b53774b7a0df60fab44c95602f
Parents: 26c0520
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Sat Dec 20 21:12:09 2014 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 10:56:57 2015 -0700
----------------------------------------------------------------------
PubSubWebSocketServlet.java | 47 ++++++++++++++++++++++++----------------
1 file changed, 28 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/c0b1178b/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
index ba1ac9c..7693d92 100644
--- a/PubSubWebSocketServlet.java
+++ b/PubSubWebSocketServlet.java
@@ -4,14 +4,15 @@
*/
package com.datatorrent.gateway;
-import com.datatorrent.gateway.security.AuthDatabase;
-import com.datatorrent.gateway.security.AuthenticationException;
import java.io.IOException;
-import java.util.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import javax.servlet.ServletException;
+import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -21,13 +22,15 @@ import org.eclipse.jetty.websocket.WebSocketServlet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datatorrent.gateway.security.DTPrincipal;
import com.datatorrent.lib.util.JacksonObjectMapperProvider;
import com.datatorrent.lib.util.PubSubMessage;
import com.datatorrent.lib.util.PubSubMessage.PubSubMessageType;
import com.datatorrent.lib.util.PubSubMessageCodec;
+
+import com.datatorrent.gateway.security.AuthDatabase;
+import com.datatorrent.gateway.security.AuthenticationException;
+import com.datatorrent.gateway.security.DTPrincipal;
import com.datatorrent.stram.util.LRUCache;
-import javax.servlet.http.Cookie;
/**
@@ -121,30 +124,36 @@ public class PubSubWebSocketServlet extends WebSocketServlet
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
- if (DTGateway.WEB_AUTH_TYPE_PASSWORD.equals(gateway.getWebAuthType())) {
- Cookie[] cookies = request.getCookies();
- if (cookies != null) {
- for (Cookie cookie : cookies) {
- if ("session".equals(cookie.getName())) {
- try {
- AuthDatabase auth = gateway.getAuthDatabase();
- DTPrincipal principal = auth.authenticateSession(cookie.getValue());
- request.setAttribute(AUTH_ATTRIBUTE, principal);
- }
- catch (AuthenticationException ex) {
+ DTPrincipal principal = null;
+ AuthDatabase auth = gateway.getAuthDatabase();
+ if (gateway.isDTSessionHandled()) {
+ //if (DTGateway.WEB_AUTH_TYPE_PASSWORD.equals(gateway.getWebAuthType())) {
+ Cookie[] cookies = request.getCookies();
+ if (cookies != null) {
+ for (Cookie cookie : cookies) {
+ if ("session".equals(cookie.getName())) {
+ try {
+ principal = auth.authenticateSession(cookie.getValue());
+ //request.setAttribute(AUTH_ATTRIBUTE, principal);
+ } catch (AuthenticationException ex) {
/* commenting this out to allow anonymous publish from stram
throw new WebApplicationException(ex, Status.FORBIDDEN);
*/
+ }
+ //super.service(request, response);
}
- //super.service(request, response);
}
}
- }
/* commenting this out to allow anonymous publish from stram
throw new WebApplicationException(Status.UNAUTHORIZED);
*/
+ //}
+ } else if (gateway.isHadoopAuthFilterHandled()){
+ principal = auth.getUser(request.getUserPrincipal().getName());
+ }
+ if (principal != null) {
+ request.setAttribute(AUTH_ATTRIBUTE, principal);
}
-
super.service(request, response);
}
[09/11] incubator-apex-core git commit: SPOI-5559 fixed NPE in auto
publish of apps
Posted by th...@apache.org.
SPOI-5559 fixed NPE in auto publish of apps
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/74884441
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/74884441
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/74884441
Branch: refs/heads/devel-3
Commit: 74884441e642f06f3f3576291c23796d6cddc57e
Parents: 0913456
Author: David Yan <da...@datatorrent.com>
Authored: Mon Jul 20 13:02:56 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 10:56:57 2015 -0700
----------------------------------------------------------------------
PubSubWebSocketServlet.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/74884441/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
index 842f7f6..b039f44 100644
--- a/PubSubWebSocketServlet.java
+++ b/PubSubWebSocketServlet.java
@@ -194,7 +194,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet
return;
}
else {
- LOG.info("Subscribe is allowed for topic {}, user {}", topic, webSocket.getPrincipal());
+ LOG.debug("Subscribe is allowed for topic {}, user {}", topic, webSocket.getPrincipal());
}
HashSet<PubSubWebSocket> wsSet;
[08/11] incubator-apex-core git commit: added
JSONSerializationProvider so that it knows how to serialize logical plan
Posted by th...@apache.org.
added JSONSerializationProvider so that it knows how to serialize logical plan
Conflicts:
gateway/src/main/java/com/datatorrent/gateway/resources/ws/v2/AppPackagesResource.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/09134568
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/09134568
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/09134568
Branch: refs/heads/devel-3
Commit: 091345688a1b95bf3071f604058a9ebfc9f688f2
Parents: 34b92cb
Author: David Yan <da...@datatorrent.com>
Authored: Thu Feb 12 16:08:47 2015 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 10:56:57 2015 -0700
----------------------------------------------------------------------
PubSubWebSocketServlet.java | 15 ++++++++-------
1 file changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/09134568/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
index f628364..842f7f6 100644
--- a/PubSubWebSocketServlet.java
+++ b/PubSubWebSocketServlet.java
@@ -25,14 +25,14 @@ import org.eclipse.jetty.websocket.WebSocketServlet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datatorrent.lib.util.JacksonObjectMapperProvider;
-import com.datatorrent.lib.util.PubSubMessage;
-import com.datatorrent.lib.util.PubSubMessage.PubSubMessageType;
-import com.datatorrent.lib.util.PubSubMessageCodec;
+import com.datatorrent.common.util.PubSubMessage;
+import com.datatorrent.common.util.PubSubMessage.PubSubMessageType;
+import com.datatorrent.common.util.PubSubMessageCodec;
import com.datatorrent.gateway.security.AuthDatabase;
import com.datatorrent.gateway.security.AuthenticationException;
import com.datatorrent.gateway.security.DTPrincipal;
+import com.datatorrent.stram.util.JSONSerializationProvider;
import com.datatorrent.stram.util.LRUCache;
@@ -48,7 +48,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet
private static final long serialVersionUID = 1L;
private HashMap<String, HashSet<PubSubWebSocket>> topicToSocketMap = new HashMap<String, HashSet<PubSubWebSocket>>();
private HashMap<PubSubWebSocket, HashSet<String>> socketToTopicMap = new HashMap<PubSubWebSocket, HashSet<String>>();
- private ObjectMapper mapper = (new JacksonObjectMapperProvider()).getContext(null);
+ private ObjectMapper mapper = (new JSONSerializationProvider()).getContext(null);
private PubSubMessageCodec<Object> codec = new PubSubMessageCodec<Object>(mapper);
private InternalMessageHandler internalMessageHandler = null;
private static final int latestTopicCount = 100;
@@ -308,7 +308,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet
private class PubSubWebSocket implements WebSocket.OnTextMessage
{
private Connection connection;
- private final BlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(32);
+ private final BlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(1024);
private final Thread messengerThread = new Thread(new Messenger());
private final DTPrincipal principal;
@@ -385,7 +385,8 @@ public class PubSubWebSocketServlet extends WebSocketServlet
{
LOG.debug("onOpen");
this.connection = connection;
- this.connection.setMaxIdleTime(60 * 60 * 1000); // idle time set to one hour to clear out idle connections from taking resources
+ this.connection.setMaxIdleTime(5 * 60 * 1000); // idle time set to five minute to clear out idle connections from taking resources
+ this.connection.setMaxTextMessageSize(8 * 1024 * 1024); // allow larger text message
messengerThread.start();
}
[02/11] incubator-apex-core git commit: SPOI-1770 exclude special
topics in latest topic list
Posted by th...@apache.org.
SPOI-1770 exclude special topics in latest topic list
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/9a9f1538
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/9a9f1538
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/9a9f1538
Branch: refs/heads/devel-3
Commit: 9a9f15381607c34b56e600a8083bdd41caf6f137
Parents: 4e5c9e5
Author: David Yan <da...@datatorrent.com>
Authored: Thu Jan 16 18:12:04 2014 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 10:56:56 2015 -0700
----------------------------------------------------------------------
PubSubWebSocketServlet.java | 38 ++++++++++++++++++++++----------------
1 file changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9a9f1538/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
index 21bfd73..11bf98e 100644
--- a/PubSubWebSocketServlet.java
+++ b/PubSubWebSocketServlet.java
@@ -4,23 +4,26 @@
*/
package com.datatorrent.gateway;
-import com.datatorrent.api.util.JacksonObjectMapperProvider;
-import com.datatorrent.api.util.PubSubMessage;
-import com.datatorrent.api.util.PubSubMessage.PubSubMessageType;
-import com.datatorrent.api.util.PubSubMessageCodec;
-import com.datatorrent.stram.util.LRUCache;
-
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+
import javax.servlet.http.HttpServletRequest;
+
import org.codehaus.jackson.map.ObjectMapper;
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocketServlet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.lib.util.JacksonObjectMapperProvider;
+import com.datatorrent.lib.util.PubSubMessage;
+import com.datatorrent.lib.util.PubSubMessage.PubSubMessageType;
+import com.datatorrent.lib.util.PubSubMessageCodec;
+
+import com.datatorrent.stram.util.LRUCache;
+
/**
* <p>PubSubWebSocketServlet class.</p>
*
@@ -37,8 +40,9 @@ public class PubSubWebSocketServlet extends WebSocketServlet
private PubSubMessageCodec<Object> codec = new PubSubMessageCodec<Object>(mapper);
private InternalMessageHandler internalMessageHandler = null;
private static final int latestTopicCount = 100;
- private LRUCache<String, Long> latestTopics = new LRUCache<String, Long>(latestTopicCount, false)
+ private final LRUCache<String, Long> latestTopics = new LRUCache<String, Long>(latestTopicCount, false)
{
+ private static final long serialVersionUID = 20140131L;
@Override
public Long put(String key, Long value)
{
@@ -100,7 +104,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet
topicSet = socketToTopicMap.get(webSocket);
}
topicSet.add(topic);
- publish(topic + ".numSubscribers", new Integer(getNumSubscribers(topic)));
+ publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
}
private synchronized void unsubscribe(PubSubWebSocket webSocket, String topic)
@@ -121,7 +125,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet
if (topicSet.isEmpty()) {
socketToTopicMap.remove(webSocket);
}
- publish(topic + ".numSubscribers", new Integer(getNumSubscribers(topic)));
+ publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
}
private synchronized void unsubscribeAll(PubSubWebSocket webSocket)
@@ -134,7 +138,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet
if (wsSet.isEmpty()) {
topicToSocketMap.remove(topic);
}
- publish(topic + ".numSubscribers", new Integer(getNumSubscribers(topic)));
+ publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
}
socketToTopicMap.remove(webSocket);
}
@@ -163,7 +167,9 @@ public class PubSubWebSocketServlet extends WebSocketServlet
public synchronized void publish(String topic, Object data)
{
- latestTopics.put(topic, System.currentTimeMillis());
+ if (!topic.endsWith("." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX) && !topic.startsWith(PubSubMessage.INTERNAL_TOPIC_PREFIX + ".")) {
+ latestTopics.put(topic, System.currentTimeMillis());
+ }
HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
if (wsSet != null) {
Iterator<PubSubWebSocket> it = wsSet.iterator();
@@ -183,8 +189,8 @@ public class PubSubWebSocketServlet extends WebSocketServlet
private class PubSubWebSocket implements WebSocket.OnTextMessage
{
private Connection connection;
- private BlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(32);
- private Thread messengerThread = new Thread(new Messenger());
+ private final BlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(32);
+ private final Thread messengerThread = new Thread(new Messenger());
@Override
public void onMessage(String message)
@@ -222,13 +228,13 @@ public class PubSubWebSocketServlet extends WebSocketServlet
}
else if (type.equals(PubSubMessageType.SUBSCRIBE_NUM_SUBSCRIBERS)) {
if (topic != null) {
- subscribe(this, topic + ".numSubscribers");
- sendData(this, topic + ".numSubscribers", new Integer(getNumSubscribers(topic)));
+ subscribe(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX);
+ sendData(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
}
}
else if (type.equals(PubSubMessageType.UNSUBSCRIBE_NUM_SUBSCRIBERS)) {
if (topic != null) {
- unsubscribe(this, topic + ".numSubscribers");
+ unsubscribe(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX);
}
}
else if (type.equals(PubSubMessageType.GET_LATEST_TOPICS)) {
[07/11] incubator-apex-core git commit: Calling filter to
authenticate kerberos token for pubsub
Posted by th...@apache.org.
Calling filter to authenticate kerberos token for pubsub
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/34b92cb8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/34b92cb8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/34b92cb8
Branch: refs/heads/devel-3
Commit: 34b92cb8cce49444a91f8bc5a272da7222a91b7e
Parents: c0b1178
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Sat Dec 20 22:06:11 2014 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 10:56:57 2015 -0700
----------------------------------------------------------------------
PubSubWebSocketServlet.java | 33 ++++++++++++++++++++++++++++-----
1 file changed, 28 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/34b92cb8/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
index 7693d92..f628364 100644
--- a/PubSubWebSocketServlet.java
+++ b/PubSubWebSocketServlet.java
@@ -11,7 +11,10 @@ import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import javax.servlet.FilterChain;
import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -124,6 +127,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
+ boolean handled = false;
DTPrincipal principal = null;
AuthDatabase auth = gateway.getAuthDatabase();
if (gateway.isDTSessionHandled()) {
@@ -148,13 +152,32 @@ public class PubSubWebSocketServlet extends WebSocketServlet
throw new WebApplicationException(Status.UNAUTHORIZED);
*/
//}
- } else if (gateway.isHadoopAuthFilterHandled()){
- principal = auth.getUser(request.getUserPrincipal().getName());
+ } else if (gateway.isHadoopAuthFilterHandled()) {
+ final UserHolder userHolder = new UserHolder();
+ gateway.getHadoopAuthFilter().doFilter(request, response, new FilterChain()
+ {
+ @Override
+ public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse) throws IOException, ServletException
+ {
+ userHolder.username = ((HttpServletRequest)servletRequest).getUserPrincipal().getName();
+ }
+ });
+ if (response.getStatus() == HttpServletResponse.SC_OK) {
+ principal = auth.getUser(userHolder.username);
+ } else {
+ handled = true;
+ }
}
- if (principal != null) {
- request.setAttribute(AUTH_ATTRIBUTE, principal);
+ if (!handled) {
+ if (principal != null) {
+ request.setAttribute(AUTH_ATTRIBUTE, principal);
+ }
+ super.service(request, response);
}
- super.service(request, response);
+ }
+
+ private class UserHolder {
+ public String username;
}
@Override