You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2015/08/29 00:02:17 UTC

[01/11] incubator-apex-core git commit: SPOI-2255 #resolve password protect the websocket

Repository: incubator-apex-core
Updated Branches:
  refs/heads/devel-3 a3e861ef4 -> d748ed46f


SPOI-2255 #resolve password protect the websocket


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/55844f86
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/55844f86
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/55844f86

Branch: refs/heads/devel-3
Commit: 55844f86af24a46907a8441787bddcf8a437e182
Parents: 9a9f153
Author: David Yan <da...@datatorrent.com>
Authored: Mon Apr 7 14:40:02 2014 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 10:56:56 2015 -0700

----------------------------------------------------------------------
 PubSubWebSocketServlet.java | 50 +++++++++++++++++++++++++++++-----------
 1 file changed, 37 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/55844f86/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
index 11bf98e..21b48f7 100644
--- a/PubSubWebSocketServlet.java
+++ b/PubSubWebSocketServlet.java
@@ -4,6 +4,7 @@
  */
 package com.datatorrent.gateway;
 
+import com.datatorrent.gateway.security.AuthenticationException;
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -23,6 +24,11 @@ import com.datatorrent.lib.util.PubSubMessage.PubSubMessageType;
 import com.datatorrent.lib.util.PubSubMessageCodec;
 
 import com.datatorrent.stram.util.LRUCache;
+import javax.servlet.ServletException;
+import javax.servlet.http.Cookie;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response.Status;
 
 /**
  * <p>PubSubWebSocketServlet class.</p>
@@ -40,9 +46,11 @@ public class PubSubWebSocketServlet extends WebSocketServlet
   private PubSubMessageCodec<Object> codec = new PubSubMessageCodec<Object>(mapper);
   private InternalMessageHandler internalMessageHandler = null;
   private static final int latestTopicCount = 100;
+  private final DTGateway gateway;
   private final LRUCache<String, Long> latestTopics = new LRUCache<String, Long>(latestTopicCount, false)
   {
     private static final long serialVersionUID = 20140131L;
+
     @Override
     public Long put(String key, Long value)
     {
@@ -58,26 +66,42 @@ public class PubSubWebSocketServlet extends WebSocketServlet
 
   }
 
-  /*
-   private int timeout;
-
-   public void setTimeout(int timeout) {
-   this.timeout = timeout;
-   }
-   */
+  public PubSubWebSocketServlet(DTGateway gateway)
+  {
+    this.gateway = gateway;
+  }
 
-  /*
-   private int timeout;
-   public void setTimeout(int timeout) {
-   this.timeout = timeout;
-   }
-   */
   public void setInternalMessageHandler(InternalMessageHandler internalMessageHandler)
   {
     this.internalMessageHandler = internalMessageHandler;
   }
 
   @Override
+  protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
+  {
+    if ("simple".equals(gateway.getWebAuthType())) {
+      Cookie[] cookies = request.getCookies();
+      if (cookies != null) {
+        for (Cookie cookie : cookies) {
+          if ("session".equals(cookie.getName())) {
+            try {
+              gateway.getAuthDatabase().authenticateSession(cookie.getValue());
+            }
+            catch (AuthenticationException ex) {
+              throw new WebApplicationException(ex, Status.FORBIDDEN);
+            }
+            super.service(request, response);
+          }
+        }
+      }
+      throw new WebApplicationException(Status.FORBIDDEN);
+    }
+    else {
+      super.service(request, response);
+    }
+  }
+
+  @Override
   public WebSocket doWebSocketConnect(HttpServletRequest hsr, String protocol)
   {
     return new PubSubWebSocket();


[03/11] incubator-apex-core git commit: SPOI-1770 exposing list of latest 100 topics

Posted by th...@apache.org.
SPOI-1770 exposing list of latest 100 topics


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/4e5c9e56
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/4e5c9e56
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/4e5c9e56

Branch: refs/heads/devel-3
Commit: 4e5c9e561f34aed94f6f205f8b13cd290419f5a9
Parents: 0e3a272
Author: David Yan <da...@datatorrent.com>
Authored: Thu Jan 16 18:02:44 2014 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 10:56:56 2015 -0700

----------------------------------------------------------------------
 PubSubWebSocketServlet.java | 22 +++++++++++++++++++---
 1 file changed, 19 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4e5c9e56/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
index cd7a71f..21bfd73 100644
--- a/PubSubWebSocketServlet.java
+++ b/PubSubWebSocketServlet.java
@@ -8,11 +8,10 @@ import com.datatorrent.api.util.JacksonObjectMapperProvider;
 import com.datatorrent.api.util.PubSubMessage;
 import com.datatorrent.api.util.PubSubMessage.PubSubMessageType;
 import com.datatorrent.api.util.PubSubMessageCodec;
+import com.datatorrent.stram.util.LRUCache;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
+import java.util.*;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import javax.servlet.http.HttpServletRequest;
@@ -37,6 +36,17 @@ public class PubSubWebSocketServlet extends WebSocketServlet
   private ObjectMapper mapper = (new JacksonObjectMapperProvider()).getContext(null);
   private PubSubMessageCodec<Object> codec = new PubSubMessageCodec<Object>(mapper);
   private InternalMessageHandler internalMessageHandler = null;
+  private static final int latestTopicCount = 100;
+  private LRUCache<String, Long> latestTopics = new LRUCache<String, Long>(latestTopicCount, false)
+  {
+    @Override
+    public Long put(String key, Long value)
+    {
+      remove(key); // this is to make the key the most recently inserted entry
+      return super.put(key, value);
+    }
+
+  };
 
   public interface InternalMessageHandler
   {
@@ -153,6 +163,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet
 
   public synchronized void publish(String topic, Object data)
   {
+    latestTopics.put(topic, System.currentTimeMillis());
     HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
     if (wsSet != null) {
       Iterator<PubSubWebSocket> it = wsSet.iterator();
@@ -220,6 +231,11 @@ public class PubSubWebSocketServlet extends WebSocketServlet
                 unsubscribe(this, topic + ".numSubscribers");
               }
             }
+            else if (type.equals(PubSubMessageType.GET_LATEST_TOPICS)) {
+              synchronized (this) {
+                sendData(this, "_latestTopics", latestTopics.keySet());
+              }
+            }
           }
         }
       }


[04/11] incubator-apex-core git commit: change name and jump version

Posted by th...@apache.org.
change name and jump version


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/0e3a2728
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/0e3a2728
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/0e3a2728

Branch: refs/heads/devel-3
Commit: 0e3a2728aaab37fc1ce938efef33649899f166a3
Parents: a3e861e
Author: David Yan <da...@datatorrent.com>
Authored: Tue Oct 15 20:00:51 2013 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 10:56:56 2015 -0700

----------------------------------------------------------------------
 PubSubWebSocketServlet.java | 284 +++++++++++++++++++++++++++++++++++++++
 1 file changed, 284 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0e3a2728/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
new file mode 100644
index 0000000..cd7a71f
--- /dev/null
+++ b/PubSubWebSocketServlet.java
@@ -0,0 +1,284 @@
+/*
+ *  Copyright (c) 2012-2013 DataTorrent, Inc.
+ *  All Rights Reserved.
+ */
+package com.datatorrent.gateway;
+
+import com.datatorrent.api.util.JacksonObjectMapperProvider;
+import com.datatorrent.api.util.PubSubMessage;
+import com.datatorrent.api.util.PubSubMessage.PubSubMessageType;
+import com.datatorrent.api.util.PubSubMessageCodec;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import javax.servlet.http.HttpServletRequest;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.eclipse.jetty.websocket.WebSocket;
+import org.eclipse.jetty.websocket.WebSocketServlet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>PubSubWebSocketServlet class.</p>
+ *
+ * @author David Yan <da...@datatorrent.com>
+ * @since 0.3.2
+ */
+public class PubSubWebSocketServlet extends WebSocketServlet
+{
+  private static final Logger LOG = LoggerFactory.getLogger(PubSubWebSocketServlet.class);
+  private static final long serialVersionUID = 1L;
+  private HashMap<String, HashSet<PubSubWebSocket>> topicToSocketMap = new HashMap<String, HashSet<PubSubWebSocket>>();
+  private HashMap<PubSubWebSocket, HashSet<String>> socketToTopicMap = new HashMap<PubSubWebSocket, HashSet<String>>();
+  private ObjectMapper mapper = (new JacksonObjectMapperProvider()).getContext(null);
+  private PubSubMessageCodec<Object> codec = new PubSubMessageCodec<Object>(mapper);
+  private InternalMessageHandler internalMessageHandler = null;
+
+  public interface InternalMessageHandler
+  {
+    void onMessage(String topic, Object data);
+
+  }
+
+  /*
+   private int timeout;
+
+   public void setTimeout(int timeout) {
+   this.timeout = timeout;
+   }
+   */
+
+  /*
+   private int timeout;
+   public void setTimeout(int timeout) {
+   this.timeout = timeout;
+   }
+   */
+  public void setInternalMessageHandler(InternalMessageHandler internalMessageHandler)
+  {
+    this.internalMessageHandler = internalMessageHandler;
+  }
+
+  @Override
+  public WebSocket doWebSocketConnect(HttpServletRequest hsr, String protocol)
+  {
+    return new PubSubWebSocket();
+  }
+
+  private synchronized void subscribe(PubSubWebSocket webSocket, String topic)
+  {
+    HashSet<PubSubWebSocket> wsSet;
+    if (!topicToSocketMap.containsKey(topic)) {
+      wsSet = new HashSet<PubSubWebSocket>();
+      topicToSocketMap.put(topic, wsSet);
+    }
+    else {
+      wsSet = topicToSocketMap.get(topic);
+    }
+    wsSet.add(webSocket);
+
+    HashSet<String> topicSet;
+    if (!socketToTopicMap.containsKey(webSocket)) {
+      topicSet = new HashSet<String>(0);
+      socketToTopicMap.put(webSocket, topicSet);
+    }
+    else {
+      topicSet = socketToTopicMap.get(webSocket);
+    }
+    topicSet.add(topic);
+    publish(topic + ".numSubscribers", new Integer(getNumSubscribers(topic)));
+  }
+
+  private synchronized void unsubscribe(PubSubWebSocket webSocket, String topic)
+  {
+    if (!topicToSocketMap.containsKey(topic)) {
+      return;
+    }
+    HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
+    wsSet.remove(webSocket);
+    if (wsSet.isEmpty()) {
+      topicToSocketMap.remove(topic);
+    }
+    if (!socketToTopicMap.containsKey(webSocket)) {
+      return;
+    }
+    HashSet<String> topicSet = socketToTopicMap.get(webSocket);
+    topicSet.remove(topic);
+    if (topicSet.isEmpty()) {
+      socketToTopicMap.remove(webSocket);
+    }
+    publish(topic + ".numSubscribers", new Integer(getNumSubscribers(topic)));
+  }
+
+  private synchronized void unsubscribeAll(PubSubWebSocket webSocket)
+  {
+    HashSet<String> topicSet = socketToTopicMap.get(webSocket);
+    if (topicSet != null) {
+      for (String topic : topicSet) {
+        HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
+        wsSet.remove(webSocket);
+        if (wsSet.isEmpty()) {
+          topicToSocketMap.remove(topic);
+        }
+        publish(topic + ".numSubscribers", new Integer(getNumSubscribers(topic)));
+      }
+      socketToTopicMap.remove(webSocket);
+    }
+  }
+
+  private synchronized void disconnect(PubSubWebSocket webSocket)
+  {
+    unsubscribeAll(webSocket);
+  }
+
+  public synchronized int getNumSubscribers(String topic)
+  {
+    HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
+    return wsSet == null ? 0 : wsSet.size();
+  }
+
+  private synchronized void sendData(PubSubWebSocket webSocket, String topic, Object data) throws IOException
+  {
+    PubSubMessage<Object> pubSubMessage = new PubSubMessage<Object>();
+    pubSubMessage.setType(PubSubMessageType.DATA);
+    pubSubMessage.setTopic(topic);
+    pubSubMessage.setData(data);
+    LOG.debug("Sending data of {} to subscriber...", topic);
+    webSocket.sendMessage(codec.formatMessage(pubSubMessage));
+  }
+
+  public synchronized void publish(String topic, Object data)
+  {
+    HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
+    if (wsSet != null) {
+      Iterator<PubSubWebSocket> it = wsSet.iterator();
+      while (it.hasNext()) {
+        PubSubWebSocket socket = it.next();
+        try {
+          sendData(socket, topic, data);
+        }
+        catch (Exception ex) {
+          it.remove();
+          disconnect(socket);
+        }
+      }
+    }
+  }
+
+  private class PubSubWebSocket implements WebSocket.OnTextMessage
+  {
+    private Connection connection;
+    private BlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(32);
+    private Thread messengerThread = new Thread(new Messenger());
+
+    @Override
+    public void onMessage(String message)
+    {
+      LOG.debug("Received message {}", message);
+      try {
+        @SuppressWarnings("unchecked")
+        PubSubMessage<Object> pubSubMessage = codec.parseMessage(message);
+        if (pubSubMessage != null) {
+          PubSubMessageType type = pubSubMessage.getType();
+          String topic = pubSubMessage.getTopic();
+          if (type != null) {
+            if (type.equals(PubSubMessageType.SUBSCRIBE)) {
+              if (topic != null) {
+                subscribe(this, topic);
+              }
+            }
+            else if (type.equals(PubSubMessageType.UNSUBSCRIBE)) {
+              if (topic != null) {
+                unsubscribe(this, topic);
+              }
+            }
+            else if (type.equals(PubSubMessageType.PUBLISH)) {
+              if (topic != null) {
+                Object data = pubSubMessage.getData();
+                if (data != null) {
+                  publish(topic, data);
+                }
+                if (topic.startsWith(PubSubMessage.INTERNAL_TOPIC_PREFIX + ".")) {
+                  if (internalMessageHandler != null) {
+                    internalMessageHandler.onMessage(topic, data);
+                  }
+                }
+              }
+            }
+            else if (type.equals(PubSubMessageType.SUBSCRIBE_NUM_SUBSCRIBERS)) {
+              if (topic != null) {
+                subscribe(this, topic + ".numSubscribers");
+                sendData(this, topic + ".numSubscribers", new Integer(getNumSubscribers(topic)));
+              }
+            }
+            else if (type.equals(PubSubMessageType.UNSUBSCRIBE_NUM_SUBSCRIBERS)) {
+              if (topic != null) {
+                unsubscribe(this, topic + ".numSubscribers");
+              }
+            }
+          }
+        }
+      }
+      catch (Exception ex) {
+        LOG.warn("Exception caught", ex);
+      }
+    }
+
+    @Override
+    public void onOpen(Connection connection)
+    {
+      LOG.debug("onOpen");
+      this.connection = connection;
+      this.connection.setMaxIdleTime(60 * 60 * 1000); // idle time set to one hour to clear out idle connections from taking resources
+      messengerThread.start();
+    }
+
+    @Override
+    public void onClose(int i, String string)
+    {
+      LOG.debug("onClose");
+      disconnect(this);
+      messengerThread.interrupt();
+    }
+
+    public void sendMessage(String message) throws IllegalStateException
+    {
+      messageQueue.add(message);
+    }
+
+    /*
+     * This class exists only because Jetty 8 does not support async write for websocket
+     *
+     */
+    private class Messenger implements Runnable
+    {
+      @Override
+      public void run()
+      {
+        while (!Thread.interrupted()) {
+          try {
+            String message = messageQueue.take();
+            // This call sendMessage() is blocking. This is why we have this messenger thread per connection so that one bad connection will not affect another
+            // Jetty 9 has async calls but we can't use Jetty 9 because it requires Java 7
+            // When we can use Java 7, we need to upgrade to Jetty 9.
+            connection.sendMessage(message);
+          }
+          catch (InterruptedException ex) {
+            return;
+          }
+          catch (Exception ex) {
+            LOG.error("Caught exception in websocket messenger.", ex);
+            return;
+          }
+        }
+      }
+
+    }
+
+  }
+
+}


[10/11] incubator-apex-core git commit: APEX-26 Generalized PubSubWebsocketServlet for use in Apex and dependent projects.

Posted by th...@apache.org.
APEX-26 Generalized PubSubWebsocketServlet for use in Apex and dependent projects.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/aceaeebe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/aceaeebe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/aceaeebe

Branch: refs/heads/devel-3
Commit: aceaeebe2eda27337bc2dc38f71c11156f983bdb
Parents: 7488444
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Mon Aug 3 10:29:22 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 14:19:42 2015 -0700

----------------------------------------------------------------------
 PubSubWebSocketServlet.java                     | 437 -------------------
 engine/pom.xml                                  |   1 -
 .../stram/util/PubSubWebSocketServlet.java      | 376 ++++++++++++++++
 3 files changed, 376 insertions(+), 438 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/aceaeebe/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
deleted file mode 100644
index b039f44..0000000
--- a/PubSubWebSocketServlet.java
+++ /dev/null
@@ -1,437 +0,0 @@
-/*
- *  Copyright (c) 2012-2013 DataTorrent, Inc.
- *  All Rights Reserved.
- */
-package com.datatorrent.gateway;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-import javax.servlet.FilterChain;
-import javax.servlet.ServletException;
-import javax.servlet.ServletRequest;
-import javax.servlet.ServletResponse;
-import javax.servlet.http.Cookie;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.codehaus.jackson.map.ObjectMapper;
-import org.eclipse.jetty.websocket.WebSocket;
-import org.eclipse.jetty.websocket.WebSocketServlet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.common.util.PubSubMessage;
-import com.datatorrent.common.util.PubSubMessage.PubSubMessageType;
-import com.datatorrent.common.util.PubSubMessageCodec;
-
-import com.datatorrent.gateway.security.AuthDatabase;
-import com.datatorrent.gateway.security.AuthenticationException;
-import com.datatorrent.gateway.security.DTPrincipal;
-import com.datatorrent.stram.util.JSONSerializationProvider;
-import com.datatorrent.stram.util.LRUCache;
-
-
-/**
- * <p>PubSubWebSocketServlet class.</p>
- *
- * @author David Yan <da...@datatorrent.com>
- * @since 0.3.2
- */
-public class PubSubWebSocketServlet extends WebSocketServlet
-{
-  private static final Logger LOG = LoggerFactory.getLogger(PubSubWebSocketServlet.class);
-  private static final long serialVersionUID = 1L;
-  private HashMap<String, HashSet<PubSubWebSocket>> topicToSocketMap = new HashMap<String, HashSet<PubSubWebSocket>>();
-  private HashMap<PubSubWebSocket, HashSet<String>> socketToTopicMap = new HashMap<PubSubWebSocket, HashSet<String>>();
-  private ObjectMapper mapper = (new JSONSerializationProvider()).getContext(null);
-  private PubSubMessageCodec<Object> codec = new PubSubMessageCodec<Object>(mapper);
-  private InternalMessageHandler internalMessageHandler = null;
-  private static final int latestTopicCount = 100;
-  private final DTGateway gateway;
-  private static final String AUTH_ATTRIBUTE = "com.datatorrent.auth.principal";
-  private SubscribeFilter subscribeFilter;
-  private SendFilter sendFilter;
-  private final LRUCache<String, Long> latestTopics = new LRUCache<String, Long>(latestTopicCount, false)
-  {
-    private static final long serialVersionUID = 20140131L;
-
-    @Override
-    public Long put(String key, Long value)
-    {
-      remove(key); // this is to make the key the most recently inserted entry
-      return super.put(key, value);
-    }
-
-  };
-
-  public interface SubscribeFilter
-  {
-
-    /**
-     * Returns whether or not the principal is allowed to subscribe to this topic
-     *
-     * @param gateway
-     * @param principal
-     * @param topic
-     * @return
-     */
-    boolean filter(DTGateway gateway, DTPrincipal principal, String topic);
-  }
-
-  public interface SendFilter
-  {
-
-    /**
-     * Returns the data it should be sent given the principal
-     *
-     * @param gateway
-     * @param principal
-     * @param topic
-     * @param data
-     * @return the data it should send to the websocket
-     */
-    Object filter(DTGateway gateway, DTPrincipal principal, String topic, Object data);
-  }
-
-  public void registerSubscribeFilter(SubscribeFilter filter)
-  {
-    subscribeFilter = filter;
-  }
-
-  public void registerSendFilter(SendFilter filter)
-  {
-    sendFilter = filter;
-  }
-
-  public interface InternalMessageHandler
-  {
-    void onMessage(String topic, Object data);
-
-  }
-
-  public PubSubWebSocketServlet(DTGateway gateway)
-  {
-    this.gateway = gateway;
-  }
-
-  public void setInternalMessageHandler(InternalMessageHandler internalMessageHandler)
-  {
-    this.internalMessageHandler = internalMessageHandler;
-  }
-
-  @Override
-  protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
-  {
-    boolean handled = false;
-    DTPrincipal principal = null;
-    AuthDatabase auth = gateway.getAuthDatabase();
-    if (gateway.isDTSessionHandled()) {
-      //if (DTGateway.WEB_AUTH_TYPE_PASSWORD.equals(gateway.getWebAuthType())) {
-        Cookie[] cookies = request.getCookies();
-        if (cookies != null) {
-          for (Cookie cookie : cookies) {
-            if ("session".equals(cookie.getName())) {
-              try {
-                principal = auth.authenticateSession(cookie.getValue());
-                //request.setAttribute(AUTH_ATTRIBUTE, principal);
-              } catch (AuthenticationException ex) {
-              /* commenting this out to allow anonymous publish from stram
-               throw new WebApplicationException(ex, Status.FORBIDDEN);
-               */
-              }
-              //super.service(request, response);
-            }
-          }
-        }
-      /* commenting this out to allow anonymous publish from stram
-       throw new WebApplicationException(Status.UNAUTHORIZED);
-       */
-      //}
-    } else if (gateway.isHadoopAuthFilterHandled()) {
-      final UserHolder userHolder = new UserHolder();
-      gateway.getHadoopAuthFilter().doFilter(request, response, new FilterChain()
-      {
-        @Override
-        public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse) throws IOException, ServletException
-        {
-          userHolder.username = ((HttpServletRequest)servletRequest).getUserPrincipal().getName();
-        }
-      });
-      if (response.getStatus() == HttpServletResponse.SC_OK) {
-        principal = auth.getUser(userHolder.username);
-      } else {
-        handled = true;
-      }
-    }
-    if (!handled) {
-      if (principal != null) {
-        request.setAttribute(AUTH_ATTRIBUTE, principal);
-      }
-      super.service(request, response);
-    }
-  }
-
-  private class UserHolder {
-    public String username;
-  }
-
-  @Override
-  public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
-  {
-    DTPrincipal principal = (DTPrincipal)request.getAttribute(AUTH_ATTRIBUTE);
-    return new PubSubWebSocket(principal);
-  }
-
-  private synchronized void subscribe(PubSubWebSocket webSocket, String topic)
-  {
-    if (subscribeFilter != null && !subscribeFilter.filter(gateway, webSocket.getPrincipal(), topic)) {
-      LOG.warn("Subscribe filter returns false for topic {}, user {}. Ignoring subscribe request", topic, webSocket.getPrincipal());
-      return;
-    }
-    else {
-      LOG.debug("Subscribe is allowed for topic {}, user {}", topic, webSocket.getPrincipal());
-    }
-
-    HashSet<PubSubWebSocket> wsSet;
-    if (!topicToSocketMap.containsKey(topic)) {
-      wsSet = new HashSet<PubSubWebSocket>();
-      topicToSocketMap.put(topic, wsSet);
-    }
-    else {
-      wsSet = topicToSocketMap.get(topic);
-    }
-    wsSet.add(webSocket);
-
-    HashSet<String> topicSet;
-    if (!socketToTopicMap.containsKey(webSocket)) {
-      topicSet = new HashSet<String>(0);
-      socketToTopicMap.put(webSocket, topicSet);
-    }
-    else {
-      topicSet = socketToTopicMap.get(webSocket);
-    }
-    topicSet.add(topic);
-    publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
-  }
-
-  private synchronized void unsubscribe(PubSubWebSocket webSocket, String topic)
-  {
-    if (!topicToSocketMap.containsKey(topic)) {
-      return;
-    }
-    HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
-    wsSet.remove(webSocket);
-    if (wsSet.isEmpty()) {
-      topicToSocketMap.remove(topic);
-    }
-    if (!socketToTopicMap.containsKey(webSocket)) {
-      return;
-    }
-    HashSet<String> topicSet = socketToTopicMap.get(webSocket);
-    topicSet.remove(topic);
-    if (topicSet.isEmpty()) {
-      socketToTopicMap.remove(webSocket);
-    }
-    publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
-  }
-
-  private synchronized void unsubscribeAll(PubSubWebSocket webSocket)
-  {
-    HashSet<String> topicSet = socketToTopicMap.get(webSocket);
-    if (topicSet != null) {
-      for (String topic : topicSet) {
-        HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
-        wsSet.remove(webSocket);
-        if (wsSet.isEmpty()) {
-          topicToSocketMap.remove(topic);
-        }
-        publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
-      }
-      socketToTopicMap.remove(webSocket);
-    }
-  }
-
-  private synchronized void disconnect(PubSubWebSocket webSocket)
-  {
-    unsubscribeAll(webSocket);
-  }
-
-  public synchronized int getNumSubscribers(String topic)
-  {
-    HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
-    return wsSet == null ? 0 : wsSet.size();
-  }
-
-  private synchronized void sendData(PubSubWebSocket webSocket, String topic, Object data) throws IOException
-  {
-    PubSubMessage<Object> pubSubMessage = new PubSubMessage<Object>();
-    pubSubMessage.setType(PubSubMessageType.DATA);
-    pubSubMessage.setTopic(topic);
-    pubSubMessage.setData(data);
-    LOG.debug("Sending data {} to subscriber...", topic);
-    webSocket.sendMessage(codec.formatMessage(pubSubMessage));
-  }
-
-  public synchronized void publish(String topic, Object data)
-  {
-    if (!topic.endsWith("." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX) && !topic.startsWith(PubSubMessage.INTERNAL_TOPIC_PREFIX + ".")) {
-      latestTopics.put(topic, System.currentTimeMillis());
-    }
-    HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
-    if (wsSet != null) {
-      Iterator<PubSubWebSocket> it = wsSet.iterator();
-      while (it.hasNext()) {
-        PubSubWebSocket socket = it.next();
-        try {
-          if (sendFilter != null) {
-            Object filteredData = sendFilter.filter(gateway, socket.getPrincipal(), topic, data);
-            sendData(socket, topic, filteredData);
-          }
-          else {
-            sendData(socket, topic, data);
-          }
-        }
-        catch (Exception ex) {
-          LOG.error("Cannot send message", ex);
-          it.remove();
-          disconnect(socket);
-        }
-      }
-    }
-  }
-
-  private class PubSubWebSocket implements WebSocket.OnTextMessage
-  {
-    private Connection connection;
-    private final BlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(1024);
-    private final Thread messengerThread = new Thread(new Messenger());
-    private final DTPrincipal principal;
-
-    public PubSubWebSocket(DTPrincipal principal)
-    {
-      this.principal = principal;
-    }
-
-    public DTPrincipal getPrincipal()
-    {
-      return principal;
-    }
-
-    @Override
-    public void onMessage(String message)
-    {
-      LOG.debug("Received message {}", message);
-      try {
-        @SuppressWarnings("unchecked")
-        PubSubMessage<Object> pubSubMessage = codec.parseMessage(message);
-        if (pubSubMessage != null) {
-          PubSubMessageType type = pubSubMessage.getType();
-          String topic = pubSubMessage.getTopic();
-          if (type != null) {
-            if (type.equals(PubSubMessageType.SUBSCRIBE)) {
-              if (topic != null) {
-                subscribe(this, topic);
-              }
-            }
-            else if (type.equals(PubSubMessageType.UNSUBSCRIBE)) {
-              if (topic != null) {
-                unsubscribe(this, topic);
-              }
-            }
-            else if (type.equals(PubSubMessageType.PUBLISH)) {
-              if (topic != null) {
-                Object data = pubSubMessage.getData();
-                if (data != null) {
-                  publish(topic, data);
-                }
-                if (topic.startsWith(PubSubMessage.INTERNAL_TOPIC_PREFIX + ".")) {
-                  if (internalMessageHandler != null) {
-                    internalMessageHandler.onMessage(topic, data);
-                  }
-                }
-              }
-            }
-            else if (type.equals(PubSubMessageType.SUBSCRIBE_NUM_SUBSCRIBERS)) {
-              if (topic != null) {
-                subscribe(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX);
-                sendData(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
-              }
-            }
-            else if (type.equals(PubSubMessageType.UNSUBSCRIBE_NUM_SUBSCRIBERS)) {
-              if (topic != null) {
-                unsubscribe(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX);
-              }
-            }
-            else if (type.equals(PubSubMessageType.GET_LATEST_TOPICS)) {
-              synchronized (this) {
-                sendData(this, "_latestTopics", latestTopics.keySet());
-              }
-            }
-          }
-        }
-      }
-      catch (Exception ex) {
-        LOG.warn("Exception caught", ex);
-      }
-    }
-
-    @Override
-    public void onOpen(Connection connection)
-    {
-      LOG.debug("onOpen");
-      this.connection = connection;
-      this.connection.setMaxIdleTime(5 * 60 * 1000); // idle time set to five minute to clear out idle connections from taking resources
-      this.connection.setMaxTextMessageSize(8 * 1024 * 1024); // allow larger text message
-      messengerThread.start();
-    }
-
-    @Override
-    public void onClose(int i, String string)
-    {
-      LOG.debug("onClose");
-      disconnect(this);
-      messengerThread.interrupt();
-    }
-
-    public void sendMessage(String message) throws IllegalStateException
-    {
-      messageQueue.add(message);
-    }
-
-    /*
-     * This class exists only because Jetty 8 does not support async write for websocket
-     *
-     */
-    private class Messenger implements Runnable
-    {
-      @Override
-      public void run()
-      {
-        while (!Thread.interrupted()) {
-          try {
-            String message = messageQueue.take();
-            // This call sendMessage() is blocking. This is why we have this messenger thread per connection so that one bad connection will not affect another
-            // Jetty 9 has async calls but we can't use Jetty 9 because it requires Java 7
-            // When we can use Java 7, we need to upgrade to Jetty 9.
-            connection.sendMessage(message);
-          }
-          catch (InterruptedException ex) {
-            return;
-          }
-          catch (Exception ex) {
-            LOG.error("Caught exception in websocket messenger.", ex);
-            return;
-          }
-        }
-      }
-
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/aceaeebe/engine/pom.xml
----------------------------------------------------------------------
diff --git a/engine/pom.xml b/engine/pom.xml
index 21eaa00..2b8f66a 100644
--- a/engine/pom.xml
+++ b/engine/pom.xml
@@ -272,7 +272,6 @@
       <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-websocket</artifactId>
       <version>${jetty.version}</version>
-      <scope>test</scope>
     </dependency>
     <!-- use shaded asm library to avoid conflict -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/aceaeebe/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketServlet.java b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketServlet.java
new file mode 100644
index 0000000..d8c3df8
--- /dev/null
+++ b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketServlet.java
@@ -0,0 +1,376 @@
+/*
+ *  Copyright (c) 2012-2013 DataTorrent, Inc.
+ *  All Rights Reserved.
+ */
+package com.datatorrent.stram.util;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import javax.servlet.http.HttpServletRequest;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.eclipse.jetty.websocket.WebSocket;
+import org.eclipse.jetty.websocket.WebSocket.Connection;
+import org.eclipse.jetty.websocket.WebSocketServlet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.common.util.PubSubMessage;
+import com.datatorrent.common.util.PubSubMessage.PubSubMessageType;
+import com.datatorrent.common.util.PubSubMessageCodec;
+
+
+/**
+ * <p>PubSubWebSocketServlet class.</p>
+ *
+ * @author David Yan <da...@datatorrent.com>
+ * @since 0.3.2
+ */
+public class PubSubWebSocketServlet<SECURITY_CONTEXT, PRINCIPAL> extends WebSocketServlet
+{
+  private static final Logger LOG = LoggerFactory.getLogger(PubSubWebSocketServlet.class);
+  private static final long serialVersionUID = 1L;
+  private HashMap<String, HashSet<PubSubWebSocket>> topicToSocketMap = new HashMap<String, HashSet<PubSubWebSocket>>();
+  private HashMap<PubSubWebSocket, HashSet<String>> socketToTopicMap = new HashMap<PubSubWebSocket, HashSet<String>>();
+  private ObjectMapper mapper = (new JSONSerializationProvider()).getContext(null);
+  private PubSubMessageCodec<Object> codec = new PubSubMessageCodec<Object>(mapper);
+  private InternalMessageHandler internalMessageHandler = null;
+  private static final int latestTopicCount = 100;
+  protected SECURITY_CONTEXT securityContext;
+  private SubscribeFilter subscribeFilter;
+  private SendFilter sendFilter;
+  private String authAttribute;
+  private final LRUCache<String, Long> latestTopics = new LRUCache<String, Long>(latestTopicCount, false)
+  {
+    private static final long serialVersionUID = 20140131L;
+
+    @Override
+    public Long put(String key, Long value)
+    {
+      remove(key); // this is to make the key the most recently inserted entry
+      return super.put(key, value);
+    }
+
+  };
+
+  public interface SubscribeFilter<SECURITY_CONTEXT, PRINCIPAL>
+  {
+
+    /**
+     * Returns whether or not the principal is allowed to subscribe to this topic
+     *
+     * @param securityContext
+     * @param principal
+     * @param topic
+     * @return
+     */
+    boolean filter(SECURITY_CONTEXT securityContext, PRINCIPAL principal, String topic);
+  }
+
+  public interface SendFilter<SECURITY_CONTEXT, PRINCIPAL>
+  {
+
+    /**
+     * Returns the data it should be sent given the principal
+     *
+     * @param securityContext
+     * @param principal
+     * @param topic
+     * @param data
+     * @return the data it should send to the websocket
+     */
+    Object filter(SECURITY_CONTEXT securityContext, PRINCIPAL principal, String topic, Object data);
+  }
+
+  public void registerSubscribeFilter(SubscribeFilter filter)
+  {
+    subscribeFilter = filter;
+  }
+
+  public void registerSendFilter(SendFilter filter)
+  {
+    sendFilter = filter;
+  }
+
+  public interface InternalMessageHandler
+  {
+    void onMessage(String topic, Object data);
+
+  }
+
+  public PubSubWebSocketServlet(SECURITY_CONTEXT securityContext, String authAttribute)
+  {
+    this.securityContext = securityContext;
+    this.authAttribute = authAttribute;
+  }
+
+  public void setInternalMessageHandler(InternalMessageHandler internalMessageHandler)
+  {
+    this.internalMessageHandler = internalMessageHandler;
+  }
+
+  public class UserHolder {
+    public String username;
+  }
+
+  @Override
+  public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
+  {
+    @SuppressWarnings("unchecked")
+    PRINCIPAL principal = (PRINCIPAL) request.getAttribute(authAttribute);
+    return new PubSubWebSocket(principal);
+  }
+
+  private synchronized void subscribe(PubSubWebSocket webSocket, String topic)
+  {
+    if (subscribeFilter != null && !subscribeFilter.filter(securityContext, webSocket.getPrincipal(), topic)) {
+      LOG.warn("Subscribe filter returns false for topic {}, user {}. Ignoring subscribe request", topic, webSocket.getPrincipal());
+      return;
+    }
+    else {
+      LOG.debug("Subscribe is allowed for topic {}, user {}", topic, webSocket.getPrincipal());
+    }
+
+    HashSet<PubSubWebSocket> wsSet;
+    if (!topicToSocketMap.containsKey(topic)) {
+      wsSet = new HashSet<PubSubWebSocket>();
+      topicToSocketMap.put(topic, wsSet);
+    }
+    else {
+      wsSet = topicToSocketMap.get(topic);
+    }
+    wsSet.add(webSocket);
+
+    HashSet<String> topicSet;
+    if (!socketToTopicMap.containsKey(webSocket)) {
+      topicSet = new HashSet<String>(0);
+      socketToTopicMap.put(webSocket, topicSet);
+    }
+    else {
+      topicSet = socketToTopicMap.get(webSocket);
+    }
+    topicSet.add(topic);
+    publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
+  }
+
+  private synchronized void unsubscribe(PubSubWebSocket webSocket, String topic)
+  {
+    if (!topicToSocketMap.containsKey(topic)) {
+      return;
+    }
+    HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
+    wsSet.remove(webSocket);
+    if (wsSet.isEmpty()) {
+      topicToSocketMap.remove(topic);
+    }
+    if (!socketToTopicMap.containsKey(webSocket)) {
+      return;
+    }
+    HashSet<String> topicSet = socketToTopicMap.get(webSocket);
+    topicSet.remove(topic);
+    if (topicSet.isEmpty()) {
+      socketToTopicMap.remove(webSocket);
+    }
+    publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
+  }
+
+  private synchronized void unsubscribeAll(PubSubWebSocket webSocket)
+  {
+    HashSet<String> topicSet = socketToTopicMap.get(webSocket);
+    if (topicSet != null) {
+      for (String topic : topicSet) {
+        HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
+        wsSet.remove(webSocket);
+        if (wsSet.isEmpty()) {
+          topicToSocketMap.remove(topic);
+        }
+        publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
+      }
+      socketToTopicMap.remove(webSocket);
+    }
+  }
+
+  private synchronized void disconnect(PubSubWebSocket webSocket)
+  {
+    unsubscribeAll(webSocket);
+  }
+
+  public synchronized int getNumSubscribers(String topic)
+  {
+    HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
+    return wsSet == null ? 0 : wsSet.size();
+  }
+
+  private synchronized void sendData(PubSubWebSocket webSocket, String topic, Object data) throws IOException
+  {
+    PubSubMessage<Object> pubSubMessage = new PubSubMessage<Object>();
+    pubSubMessage.setType(PubSubMessageType.DATA);
+    pubSubMessage.setTopic(topic);
+    pubSubMessage.setData(data);
+    LOG.debug("Sending data {} to subscriber...", topic);
+    webSocket.sendMessage(codec.formatMessage(pubSubMessage));
+  }
+
+  public synchronized void publish(String topic, Object data)
+  {
+    if (!topic.endsWith("." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX) && !topic.startsWith(PubSubMessage.INTERNAL_TOPIC_PREFIX + ".")) {
+      latestTopics.put(topic, System.currentTimeMillis());
+    }
+    HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
+    if (wsSet != null) {
+      Iterator<PubSubWebSocket> it = wsSet.iterator();
+      while (it.hasNext()) {
+        PubSubWebSocket socket = it.next();
+        try {
+          if (sendFilter != null) {
+            Object filteredData = sendFilter.filter(securityContext, socket.getPrincipal(), topic, data);
+            sendData(socket, topic, filteredData);
+          }
+          else {
+            sendData(socket, topic, data);
+          }
+        }
+        catch (Exception ex) {
+          LOG.error("Cannot send message", ex);
+          it.remove();
+          disconnect(socket);
+        }
+      }
+    }
+  }
+
+  protected class PubSubWebSocket implements WebSocket.OnTextMessage
+  {
+    private Connection connection;
+    private final BlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(1024);
+    private final Thread messengerThread = new Thread(new Messenger());
+    private final PRINCIPAL principal;
+
+    public PubSubWebSocket(PRINCIPAL principal)
+    {
+      this.principal = principal;
+    }
+
+    public PRINCIPAL getPrincipal()
+    {
+      return principal;
+    }
+
+    @Override
+    public void onMessage(String message)
+    {
+      LOG.debug("Received message {}", message);
+      try {
+        @SuppressWarnings("unchecked")
+        PubSubMessage<Object> pubSubMessage = codec.parseMessage(message);
+        if (pubSubMessage != null) {
+          PubSubMessageType type = pubSubMessage.getType();
+          String topic = pubSubMessage.getTopic();
+          if (type != null) {
+            if (type.equals(PubSubMessageType.SUBSCRIBE)) {
+              if (topic != null) {
+                subscribe(this, topic);
+              }
+            }
+            else if (type.equals(PubSubMessageType.UNSUBSCRIBE)) {
+              if (topic != null) {
+                unsubscribe(this, topic);
+              }
+            }
+            else if (type.equals(PubSubMessageType.PUBLISH)) {
+              if (topic != null) {
+                Object data = pubSubMessage.getData();
+                if (data != null) {
+                  publish(topic, data);
+                }
+                if (topic.startsWith(PubSubMessage.INTERNAL_TOPIC_PREFIX + ".")) {
+                  if (internalMessageHandler != null) {
+                    internalMessageHandler.onMessage(topic, data);
+                  }
+                }
+              }
+            }
+            else if (type.equals(PubSubMessageType.SUBSCRIBE_NUM_SUBSCRIBERS)) {
+              if (topic != null) {
+                subscribe(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX);
+                sendData(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
+              }
+            }
+            else if (type.equals(PubSubMessageType.UNSUBSCRIBE_NUM_SUBSCRIBERS)) {
+              if (topic != null) {
+                unsubscribe(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX);
+              }
+            }
+            else if (type.equals(PubSubMessageType.GET_LATEST_TOPICS)) {
+              synchronized (this) {
+                sendData(this, "_latestTopics", latestTopics.keySet());
+              }
+            }
+          }
+        }
+      }
+      catch (Exception ex) {
+        LOG.warn("Exception caught", ex);
+      }
+    }
+
+    @Override
+    public void onOpen(Connection connection)
+    {
+      LOG.debug("onOpen");
+      this.connection = connection;
+      this.connection.setMaxIdleTime(5 * 60 * 1000); // idle time set to five minute to clear out idle connections from taking resources
+      this.connection.setMaxTextMessageSize(8 * 1024 * 1024); // allow larger text message
+      messengerThread.start();
+    }
+
+    @Override
+    public void onClose(int i, String string)
+    {
+      LOG.debug("onClose");
+      disconnect(this);
+      messengerThread.interrupt();
+    }
+
+    public void sendMessage(String message) throws IllegalStateException
+    {
+      messageQueue.add(message);
+    }
+
+    /*
+     * This class exists only because Jetty 8 does not support async write for websocket
+     *
+     */
+    private class Messenger implements Runnable
+    {
+      @Override
+      public void run()
+      {
+        while (!Thread.interrupted()) {
+          try {
+            String message = messageQueue.take();
+            // This call sendMessage() is blocking. This is why we have this messenger thread per connection so that one bad connection will not affect another
+            // Jetty 9 has async calls but we can't use Jetty 9 because it requires Java 7
+            // When we can use Java 7, we need to upgrade to Jetty 9.
+            connection.sendMessage(message);
+          }
+          catch (InterruptedException ex) {
+            return;
+          }
+          catch (Exception ex) {
+            LOG.error("Caught exception in websocket messenger.", ex);
+            return;
+          }
+        }
+      }
+
+    }
+
+  }
+
+}


[11/11] incubator-apex-core git commit: APEX-39 Added store and embeddable interfaces for app data which allow query operators to be embedded in a store. Also added detection of a store with an embeddable query operator.

Posted by th...@apache.org.
APEX-39 Added store and embeddable interfaces for app data which allow query operators to be embedded in a store. Also added detection of a store with an embeddable query operator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d748ed46
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d748ed46
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d748ed46

Branch: refs/heads/devel-3
Commit: d748ed46fb2ef91d74a70c2be52dc4a56bb4df71
Parents: aceaeeb
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Fri Aug 7 17:47:46 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 14:19:55 2015 -0700

----------------------------------------------------------------------
 .../common/experimental/AppData.java            | 53 +++++++++++++++++++-
 .../stram/StreamingContainerManager.java        | 31 ++++++++++--
 2 files changed, 79 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d748ed46/common/src/main/java/com/datatorrent/common/experimental/AppData.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/experimental/AppData.java b/common/src/main/java/com/datatorrent/common/experimental/AppData.java
index fbdc82c..22259c5 100644
--- a/common/src/main/java/com/datatorrent/common/experimental/AppData.java
+++ b/common/src/main/java/com/datatorrent/common/experimental/AppData.java
@@ -15,13 +15,15 @@
  */
 package com.datatorrent.common.experimental;
 
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
 import java.lang.annotation.Documented;
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Inherited;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
-
 import org.apache.hadoop.classification.InterfaceStability;
 
 /**
@@ -33,6 +35,55 @@ import org.apache.hadoop.classification.InterfaceStability;
 public interface AppData
 {
   /**
+   * This interface is for App Data stores which support embedding a query operator.
+   * @param <QUERY_TYPE> The type of the query tuple emitted by the embedded query operator.
+   */
+  interface Store<QUERY_TYPE> extends Operator.ActivationListener<OperatorContext>
+  {
+    /**
+     * Gets the query connector which is used by the store operator to receive queries. If this method returns
+     * null then this Store should have a separate query operator connected to it.
+     * @return The query connector which is used by the store operator to receive queries.
+     */
+    public EmbeddableQueryInfoProvider<QUERY_TYPE> getEmbeddableQueryInfoProvider();
+
+    /**
+     * Sets the query connector which is used by the store operator to receive queries. The store operator will call
+     * the {@link EmbeddableQueryInfoProvider#enableEmbeddedMode} method of the embeddable query operator before
+     * its {@link Operator#setup} method is called.
+     * @param embeddableQueryInfoProvider The query connector which is used by the store operator to receive queries.
+     */
+    public void setEmbeddableQueryInfoProvider(EmbeddableQueryInfoProvider<QUERY_TYPE> embeddableQueryInfoProvider);
+  }
+
+  /**
+   * This interface represents a query operator which can be embedded into an AppData data source. This operator could also
+   * be used as a standalone operator. The distinction between being used in a standalone or embedded context is made by
+   * the {@link EmbeddableQueryInfoProvider#enableEmbeddedMode} method. If this method is called at least once then the {@link EmbeddableQueryInfoProvider}
+   * will operate as if it were embedded in an {@link AppData.Store} operator. If this method is never called then the operator will behave as if
+   * it were a standalone operator.<br/><br/>
+   * <b>Note:</b> When an {@link EmbeddableQueryInfoProvider} is set on an {@link AppData.Store} then it's {@link EmbeddableQueryInfoProvider#enableEmbeddedMode}
+   * method is called before {@link Operator#setup}.
+   * @param <QUERY_TYPE> The type of the query emitted by the operator.
+   */
+  interface EmbeddableQueryInfoProvider<QUERY_TYPE> extends Operator, ConnectionInfoProvider, Operator.ActivationListener<OperatorContext>
+  {
+    /**
+     * Gets the output port for queries.
+     * @return The output port for queries.
+     */
+    public DefaultOutputPort<QUERY_TYPE> getOutputPort();
+
+    /**
+     * If this method is called at least once then this operator will work as if it were embedded in an {@link AppData.Store}.
+     * If this method is never called then this operator will behave as a standalone operator. When an {@link EmbeddableQueryInfoProvider}
+     * is set on an {@link AppData.Store} then the {@link AppData.Store} will call the {@link EmbeddableQueryInfoProvider#enableEmbeddedMode}
+     * method once before the {@link Operator.setup} is called.
+     */
+    public void enableEmbeddedMode();
+  }
+
+  /**
    * This interface should be implemented by AppData Query and Result operators.
    */
   interface ConnectionInfoProvider

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d748ed46/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 7002c1d..7944a4b 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -132,6 +132,7 @@ public class StreamingContainerManager implements PlanContext
   public final static String APP_META_FILENAME = "meta.json";
   public final static String APP_META_KEY_ATTRIBUTES = "attributes";
   public final static String APP_META_KEY_METRICS = "metrics";
+  public static final String EMBEDDABLE_QUERY_NAME_SUFFIX = ".query";
 
   public final static long LATENCY_WARNING_THRESHOLD_MILLIS = 10 * 60 * 1000; // 10 minutes
   public final static Recoverable SET_OPERATOR_PROPERTY = new SetOperatorProperty();
@@ -567,6 +568,22 @@ public class StreamingContainerManager implements PlanContext
           String queryUrl = null;
           String queryTopic = null;
 
+          boolean hasEmbeddedQuery = false;
+
+          //Discover embeddable query connectors
+          if (operatorMeta.getOperator() instanceof AppData.Store<?>) {
+            AppData.Store<?> store = (AppData.Store<?>)operatorMeta.getOperator();
+            AppData.EmbeddableQueryInfoProvider<?> embeddableQuery = store.getEmbeddableQueryInfoProvider();
+
+            if (embeddableQuery != null) {
+              hasEmbeddedQuery = true;
+              queryOperatorName = operatorMeta.getName() + EMBEDDABLE_QUERY_NAME_SUFFIX;
+              queryUrl = embeddableQuery.getAppDataURL();
+              queryTopic = embeddableQuery.getTopic();
+            }
+          }
+
+          //Discover separate query operators
           LOG.warn("DEBUG: looking at operator {} {}", operatorMeta.getName(), Thread.currentThread().getId());
           for (Map.Entry<LogicalPlan.InputPortMeta, LogicalPlan.StreamMeta> entry : inputStreams.entrySet()) {
             LogicalPlan.InputPortMeta portMeta = entry.getKey();
@@ -574,10 +591,16 @@ public class StreamingContainerManager implements PlanContext
               if (queryUrl == null) {
                 OperatorMeta queryOperatorMeta = entry.getValue().getSource().getOperatorMeta();
                 if (queryOperatorMeta.getOperator() instanceof AppData.ConnectionInfoProvider) {
-                  AppData.ConnectionInfoProvider queryOperator = (AppData.ConnectionInfoProvider) queryOperatorMeta.getOperator();
-                  queryOperatorName = queryOperatorMeta.getName();
-                  queryUrl = queryOperator.getAppDataURL();
-                  queryTopic = queryOperator.getTopic();
+                  if (!hasEmbeddedQuery) {
+                    AppData.ConnectionInfoProvider queryOperator = (AppData.ConnectionInfoProvider)queryOperatorMeta.getOperator();
+                    queryOperatorName = queryOperatorMeta.getName();
+                    queryUrl = queryOperator.getAppDataURL();
+                    queryTopic = queryOperator.getTopic();
+                  } else {
+                    LOG.warn("An embeddable query connector and the {} query operator were discovered. " +
+                             "The query operator will be ignored and the embeddable query connector will be used instead.",
+                             operatorMeta.getName());
+                  }
                 }
               } else {
                 LOG.warn("Multiple query ports found in operator {}. Ignoring the App Data Source.", operatorMeta.getName());


[06/11] incubator-apex-core git commit: SPOI-2695 #resolve renamed simple authentication to password authentication

Posted by th...@apache.org.
SPOI-2695 #resolve renamed simple authentication to password authentication


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/26c05205
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/26c05205
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/26c05205

Branch: refs/heads/devel-3
Commit: 26c0520552b6d6100d9382ee310c175a44380681
Parents: 55844f8
Author: David Yan <da...@datatorrent.com>
Authored: Wed Aug 13 16:13:46 2014 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 10:56:57 2015 -0700

----------------------------------------------------------------------
 PubSubWebSocketServlet.java | 108 +++++++++++++++++++++++++++++++++------
 1 file changed, 91 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/26c05205/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
index 21b48f7..ba1ac9c 100644
--- a/PubSubWebSocketServlet.java
+++ b/PubSubWebSocketServlet.java
@@ -4,13 +4,16 @@
  */
 package com.datatorrent.gateway;
 
+import com.datatorrent.gateway.security.AuthDatabase;
 import com.datatorrent.gateway.security.AuthenticationException;
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 
+import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
 
 import org.codehaus.jackson.map.ObjectMapper;
 import org.eclipse.jetty.websocket.WebSocket;
@@ -18,17 +21,14 @@ import org.eclipse.jetty.websocket.WebSocketServlet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.gateway.security.DTPrincipal;
 import com.datatorrent.lib.util.JacksonObjectMapperProvider;
 import com.datatorrent.lib.util.PubSubMessage;
 import com.datatorrent.lib.util.PubSubMessage.PubSubMessageType;
 import com.datatorrent.lib.util.PubSubMessageCodec;
-
 import com.datatorrent.stram.util.LRUCache;
-import javax.servlet.ServletException;
 import javax.servlet.http.Cookie;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response.Status;
+
 
 /**
  * <p>PubSubWebSocketServlet class.</p>
@@ -47,6 +47,9 @@ public class PubSubWebSocketServlet extends WebSocketServlet
   private InternalMessageHandler internalMessageHandler = null;
   private static final int latestTopicCount = 100;
   private final DTGateway gateway;
+  private static final String AUTH_ATTRIBUTE = "com.datatorrent.auth.principal";
+  private SubscribeFilter subscribeFilter;
+  private SendFilter sendFilter;
   private final LRUCache<String, Long> latestTopics = new LRUCache<String, Long>(latestTopicCount, false)
   {
     private static final long serialVersionUID = 20140131L;
@@ -60,6 +63,45 @@ public class PubSubWebSocketServlet extends WebSocketServlet
 
   };
 
+  public interface SubscribeFilter
+  {
+
+    /**
+     * Returns whether or not the principal is allowed to subscribe to this topic
+     *
+     * @param gateway
+     * @param principal
+     * @param topic
+     * @return
+     */
+    boolean filter(DTGateway gateway, DTPrincipal principal, String topic);
+  }
+
+  public interface SendFilter
+  {
+
+    /**
+     * Returns the data it should be sent given the principal
+     *
+     * @param gateway
+     * @param principal
+     * @param topic
+     * @param data
+     * @return the data it should send to the websocket
+     */
+    Object filter(DTGateway gateway, DTPrincipal principal, String topic, Object data);
+  }
+
+  public void registerSubscribeFilter(SubscribeFilter filter)
+  {
+    subscribeFilter = filter;
+  }
+
+  public void registerSendFilter(SendFilter filter)
+  {
+    sendFilter = filter;
+  }
+
   public interface InternalMessageHandler
   {
     void onMessage(String topic, Object data);
@@ -79,36 +121,50 @@ public class PubSubWebSocketServlet extends WebSocketServlet
   @Override
   protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
   {
-    if ("simple".equals(gateway.getWebAuthType())) {
+    if (DTGateway.WEB_AUTH_TYPE_PASSWORD.equals(gateway.getWebAuthType())) {
       Cookie[] cookies = request.getCookies();
       if (cookies != null) {
         for (Cookie cookie : cookies) {
           if ("session".equals(cookie.getName())) {
             try {
-              gateway.getAuthDatabase().authenticateSession(cookie.getValue());
+              AuthDatabase auth = gateway.getAuthDatabase();
+              DTPrincipal principal = auth.authenticateSession(cookie.getValue());
+              request.setAttribute(AUTH_ATTRIBUTE, principal);
             }
             catch (AuthenticationException ex) {
-              throw new WebApplicationException(ex, Status.FORBIDDEN);
+              /* commenting this out to allow anonymous publish from stram
+               throw new WebApplicationException(ex, Status.FORBIDDEN);
+               */
             }
-            super.service(request, response);
+            //super.service(request, response);
           }
         }
       }
-      throw new WebApplicationException(Status.FORBIDDEN);
-    }
-    else {
-      super.service(request, response);
+      /* commenting this out to allow anonymous publish from stram
+       throw new WebApplicationException(Status.UNAUTHORIZED);
+       */
     }
+     
+    super.service(request, response);
   }
 
   @Override
-  public WebSocket doWebSocketConnect(HttpServletRequest hsr, String protocol)
+  public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
   {
-    return new PubSubWebSocket();
+    DTPrincipal principal = (DTPrincipal)request.getAttribute(AUTH_ATTRIBUTE);
+    return new PubSubWebSocket(principal);
   }
 
   private synchronized void subscribe(PubSubWebSocket webSocket, String topic)
   {
+    if (subscribeFilter != null && !subscribeFilter.filter(gateway, webSocket.getPrincipal(), topic)) {
+      LOG.warn("Subscribe filter returns false for topic {}, user {}. Ignoring subscribe request", topic, webSocket.getPrincipal());
+      return;
+    }
+    else {
+      LOG.info("Subscribe is allowed for topic {}, user {}", topic, webSocket.getPrincipal());
+    }
+
     HashSet<PubSubWebSocket> wsSet;
     if (!topicToSocketMap.containsKey(topic)) {
       wsSet = new HashSet<PubSubWebSocket>();
@@ -185,7 +241,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet
     pubSubMessage.setType(PubSubMessageType.DATA);
     pubSubMessage.setTopic(topic);
     pubSubMessage.setData(data);
-    LOG.debug("Sending data of {} to subscriber...", topic);
+    LOG.debug("Sending data {} to subscriber...", topic);
     webSocket.sendMessage(codec.formatMessage(pubSubMessage));
   }
 
@@ -200,9 +256,16 @@ public class PubSubWebSocketServlet extends WebSocketServlet
       while (it.hasNext()) {
         PubSubWebSocket socket = it.next();
         try {
-          sendData(socket, topic, data);
+          if (sendFilter != null) {
+            Object filteredData = sendFilter.filter(gateway, socket.getPrincipal(), topic, data);
+            sendData(socket, topic, filteredData);
+          }
+          else {
+            sendData(socket, topic, data);
+          }
         }
         catch (Exception ex) {
+          LOG.error("Cannot send message", ex);
           it.remove();
           disconnect(socket);
         }
@@ -215,6 +278,17 @@ public class PubSubWebSocketServlet extends WebSocketServlet
     private Connection connection;
     private final BlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(32);
     private final Thread messengerThread = new Thread(new Messenger());
+    private final DTPrincipal principal;
+
+    public PubSubWebSocket(DTPrincipal principal)
+    {
+      this.principal = principal;
+    }
+
+    public DTPrincipal getPrincipal()
+    {
+      return principal;
+    }
 
     @Override
     public void onMessage(String message)


[05/11] incubator-apex-core git commit: Handling JAAS and kerberos auth for pubsub

Posted by th...@apache.org.
Handling JAAS and kerberos auth for pubsub


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/c0b1178b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/c0b1178b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/c0b1178b

Branch: refs/heads/devel-3
Commit: c0b1178b46c109b53774b7a0df60fab44c95602f
Parents: 26c0520
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Sat Dec 20 21:12:09 2014 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 10:56:57 2015 -0700

----------------------------------------------------------------------
 PubSubWebSocketServlet.java | 47 ++++++++++++++++++++++++----------------
 1 file changed, 28 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/c0b1178b/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
index ba1ac9c..7693d92 100644
--- a/PubSubWebSocketServlet.java
+++ b/PubSubWebSocketServlet.java
@@ -4,14 +4,15 @@
  */
 package com.datatorrent.gateway;
 
-import com.datatorrent.gateway.security.AuthDatabase;
-import com.datatorrent.gateway.security.AuthenticationException;
 import java.io.IOException;
-import java.util.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 
 import javax.servlet.ServletException;
+import javax.servlet.http.Cookie;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
@@ -21,13 +22,15 @@ import org.eclipse.jetty.websocket.WebSocketServlet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.gateway.security.DTPrincipal;
 import com.datatorrent.lib.util.JacksonObjectMapperProvider;
 import com.datatorrent.lib.util.PubSubMessage;
 import com.datatorrent.lib.util.PubSubMessage.PubSubMessageType;
 import com.datatorrent.lib.util.PubSubMessageCodec;
+
+import com.datatorrent.gateway.security.AuthDatabase;
+import com.datatorrent.gateway.security.AuthenticationException;
+import com.datatorrent.gateway.security.DTPrincipal;
 import com.datatorrent.stram.util.LRUCache;
-import javax.servlet.http.Cookie;
 
 
 /**
@@ -121,30 +124,36 @@ public class PubSubWebSocketServlet extends WebSocketServlet
   @Override
   protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
   {
-    if (DTGateway.WEB_AUTH_TYPE_PASSWORD.equals(gateway.getWebAuthType())) {
-      Cookie[] cookies = request.getCookies();
-      if (cookies != null) {
-        for (Cookie cookie : cookies) {
-          if ("session".equals(cookie.getName())) {
-            try {
-              AuthDatabase auth = gateway.getAuthDatabase();
-              DTPrincipal principal = auth.authenticateSession(cookie.getValue());
-              request.setAttribute(AUTH_ATTRIBUTE, principal);
-            }
-            catch (AuthenticationException ex) {
+    DTPrincipal principal = null;
+    AuthDatabase auth = gateway.getAuthDatabase();
+    if (gateway.isDTSessionHandled()) {
+      //if (DTGateway.WEB_AUTH_TYPE_PASSWORD.equals(gateway.getWebAuthType())) {
+        Cookie[] cookies = request.getCookies();
+        if (cookies != null) {
+          for (Cookie cookie : cookies) {
+            if ("session".equals(cookie.getName())) {
+              try {
+                principal = auth.authenticateSession(cookie.getValue());
+                //request.setAttribute(AUTH_ATTRIBUTE, principal);
+              } catch (AuthenticationException ex) {
               /* commenting this out to allow anonymous publish from stram
                throw new WebApplicationException(ex, Status.FORBIDDEN);
                */
+              }
+              //super.service(request, response);
             }
-            //super.service(request, response);
           }
         }
-      }
       /* commenting this out to allow anonymous publish from stram
        throw new WebApplicationException(Status.UNAUTHORIZED);
        */
+      //}
+    } else if (gateway.isHadoopAuthFilterHandled()){
+      principal = auth.getUser(request.getUserPrincipal().getName());
+    }
+    if (principal != null) {
+      request.setAttribute(AUTH_ATTRIBUTE, principal);
     }
-     
     super.service(request, response);
   }
 


[09/11] incubator-apex-core git commit: SPOI-5559 fixed NPE in auto publish of apps

Posted by th...@apache.org.
SPOI-5559 fixed NPE in auto publish of apps


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/74884441
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/74884441
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/74884441

Branch: refs/heads/devel-3
Commit: 74884441e642f06f3f3576291c23796d6cddc57e
Parents: 0913456
Author: David Yan <da...@datatorrent.com>
Authored: Mon Jul 20 13:02:56 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 10:56:57 2015 -0700

----------------------------------------------------------------------
 PubSubWebSocketServlet.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/74884441/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
index 842f7f6..b039f44 100644
--- a/PubSubWebSocketServlet.java
+++ b/PubSubWebSocketServlet.java
@@ -194,7 +194,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet
       return;
     }
     else {
-      LOG.info("Subscribe is allowed for topic {}, user {}", topic, webSocket.getPrincipal());
+      LOG.debug("Subscribe is allowed for topic {}, user {}", topic, webSocket.getPrincipal());
     }
 
     HashSet<PubSubWebSocket> wsSet;


[08/11] incubator-apex-core git commit: added JSONSerializationProvider so that it knows how to serialize logical plan

Posted by th...@apache.org.
added JSONSerializationProvider so that it knows how to serialize logical plan

Conflicts:
	gateway/src/main/java/com/datatorrent/gateway/resources/ws/v2/AppPackagesResource.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/09134568
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/09134568
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/09134568

Branch: refs/heads/devel-3
Commit: 091345688a1b95bf3071f604058a9ebfc9f688f2
Parents: 34b92cb
Author: David Yan <da...@datatorrent.com>
Authored: Thu Feb 12 16:08:47 2015 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 10:56:57 2015 -0700

----------------------------------------------------------------------
 PubSubWebSocketServlet.java | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/09134568/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
index f628364..842f7f6 100644
--- a/PubSubWebSocketServlet.java
+++ b/PubSubWebSocketServlet.java
@@ -25,14 +25,14 @@ import org.eclipse.jetty.websocket.WebSocketServlet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.lib.util.JacksonObjectMapperProvider;
-import com.datatorrent.lib.util.PubSubMessage;
-import com.datatorrent.lib.util.PubSubMessage.PubSubMessageType;
-import com.datatorrent.lib.util.PubSubMessageCodec;
+import com.datatorrent.common.util.PubSubMessage;
+import com.datatorrent.common.util.PubSubMessage.PubSubMessageType;
+import com.datatorrent.common.util.PubSubMessageCodec;
 
 import com.datatorrent.gateway.security.AuthDatabase;
 import com.datatorrent.gateway.security.AuthenticationException;
 import com.datatorrent.gateway.security.DTPrincipal;
+import com.datatorrent.stram.util.JSONSerializationProvider;
 import com.datatorrent.stram.util.LRUCache;
 
 
@@ -48,7 +48,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet
   private static final long serialVersionUID = 1L;
   private HashMap<String, HashSet<PubSubWebSocket>> topicToSocketMap = new HashMap<String, HashSet<PubSubWebSocket>>();
   private HashMap<PubSubWebSocket, HashSet<String>> socketToTopicMap = new HashMap<PubSubWebSocket, HashSet<String>>();
-  private ObjectMapper mapper = (new JacksonObjectMapperProvider()).getContext(null);
+  private ObjectMapper mapper = (new JSONSerializationProvider()).getContext(null);
   private PubSubMessageCodec<Object> codec = new PubSubMessageCodec<Object>(mapper);
   private InternalMessageHandler internalMessageHandler = null;
   private static final int latestTopicCount = 100;
@@ -308,7 +308,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet
   private class PubSubWebSocket implements WebSocket.OnTextMessage
   {
     private Connection connection;
-    private final BlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(32);
+    private final BlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(1024);
     private final Thread messengerThread = new Thread(new Messenger());
     private final DTPrincipal principal;
 
@@ -385,7 +385,8 @@ public class PubSubWebSocketServlet extends WebSocketServlet
     {
       LOG.debug("onOpen");
       this.connection = connection;
-      this.connection.setMaxIdleTime(60 * 60 * 1000); // idle time set to one hour to clear out idle connections from taking resources
+      this.connection.setMaxIdleTime(5 * 60 * 1000); // idle time set to five minute to clear out idle connections from taking resources
+      this.connection.setMaxTextMessageSize(8 * 1024 * 1024); // allow larger text message
       messengerThread.start();
     }
 


[02/11] incubator-apex-core git commit: SPOI-1770 exclude special topics in latest topic list

Posted by th...@apache.org.
SPOI-1770 exclude special topics in latest topic list


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/9a9f1538
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/9a9f1538
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/9a9f1538

Branch: refs/heads/devel-3
Commit: 9a9f15381607c34b56e600a8083bdd41caf6f137
Parents: 4e5c9e5
Author: David Yan <da...@datatorrent.com>
Authored: Thu Jan 16 18:12:04 2014 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 10:56:56 2015 -0700

----------------------------------------------------------------------
 PubSubWebSocketServlet.java | 38 ++++++++++++++++++++++----------------
 1 file changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9a9f1538/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
index 21bfd73..11bf98e 100644
--- a/PubSubWebSocketServlet.java
+++ b/PubSubWebSocketServlet.java
@@ -4,23 +4,26 @@
  */
 package com.datatorrent.gateway;
 
-import com.datatorrent.api.util.JacksonObjectMapperProvider;
-import com.datatorrent.api.util.PubSubMessage;
-import com.datatorrent.api.util.PubSubMessage.PubSubMessageType;
-import com.datatorrent.api.util.PubSubMessageCodec;
-import com.datatorrent.stram.util.LRUCache;
-
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+
 import javax.servlet.http.HttpServletRequest;
+
 import org.codehaus.jackson.map.ObjectMapper;
 import org.eclipse.jetty.websocket.WebSocket;
 import org.eclipse.jetty.websocket.WebSocketServlet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.lib.util.JacksonObjectMapperProvider;
+import com.datatorrent.lib.util.PubSubMessage;
+import com.datatorrent.lib.util.PubSubMessage.PubSubMessageType;
+import com.datatorrent.lib.util.PubSubMessageCodec;
+
+import com.datatorrent.stram.util.LRUCache;
+
 /**
  * <p>PubSubWebSocketServlet class.</p>
  *
@@ -37,8 +40,9 @@ public class PubSubWebSocketServlet extends WebSocketServlet
   private PubSubMessageCodec<Object> codec = new PubSubMessageCodec<Object>(mapper);
   private InternalMessageHandler internalMessageHandler = null;
   private static final int latestTopicCount = 100;
-  private LRUCache<String, Long> latestTopics = new LRUCache<String, Long>(latestTopicCount, false)
+  private final LRUCache<String, Long> latestTopics = new LRUCache<String, Long>(latestTopicCount, false)
   {
+    private static final long serialVersionUID = 20140131L;
     @Override
     public Long put(String key, Long value)
     {
@@ -100,7 +104,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet
       topicSet = socketToTopicMap.get(webSocket);
     }
     topicSet.add(topic);
-    publish(topic + ".numSubscribers", new Integer(getNumSubscribers(topic)));
+    publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
   }
 
   private synchronized void unsubscribe(PubSubWebSocket webSocket, String topic)
@@ -121,7 +125,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet
     if (topicSet.isEmpty()) {
       socketToTopicMap.remove(webSocket);
     }
-    publish(topic + ".numSubscribers", new Integer(getNumSubscribers(topic)));
+    publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
   }
 
   private synchronized void unsubscribeAll(PubSubWebSocket webSocket)
@@ -134,7 +138,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet
         if (wsSet.isEmpty()) {
           topicToSocketMap.remove(topic);
         }
-        publish(topic + ".numSubscribers", new Integer(getNumSubscribers(topic)));
+        publish(topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
       }
       socketToTopicMap.remove(webSocket);
     }
@@ -163,7 +167,9 @@ public class PubSubWebSocketServlet extends WebSocketServlet
 
   public synchronized void publish(String topic, Object data)
   {
-    latestTopics.put(topic, System.currentTimeMillis());
+    if (!topic.endsWith("." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX) && !topic.startsWith(PubSubMessage.INTERNAL_TOPIC_PREFIX + ".")) {
+      latestTopics.put(topic, System.currentTimeMillis());
+    }
     HashSet<PubSubWebSocket> wsSet = topicToSocketMap.get(topic);
     if (wsSet != null) {
       Iterator<PubSubWebSocket> it = wsSet.iterator();
@@ -183,8 +189,8 @@ public class PubSubWebSocketServlet extends WebSocketServlet
   private class PubSubWebSocket implements WebSocket.OnTextMessage
   {
     private Connection connection;
-    private BlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(32);
-    private Thread messengerThread = new Thread(new Messenger());
+    private final BlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(32);
+    private final Thread messengerThread = new Thread(new Messenger());
 
     @Override
     public void onMessage(String message)
@@ -222,13 +228,13 @@ public class PubSubWebSocketServlet extends WebSocketServlet
             }
             else if (type.equals(PubSubMessageType.SUBSCRIBE_NUM_SUBSCRIBERS)) {
               if (topic != null) {
-                subscribe(this, topic + ".numSubscribers");
-                sendData(this, topic + ".numSubscribers", new Integer(getNumSubscribers(topic)));
+                subscribe(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX);
+                sendData(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX, getNumSubscribers(topic));
               }
             }
             else if (type.equals(PubSubMessageType.UNSUBSCRIBE_NUM_SUBSCRIBERS)) {
               if (topic != null) {
-                unsubscribe(this, topic + ".numSubscribers");
+                unsubscribe(this, topic + "." + PubSubMessage.NUM_SUBSCRIBERS_SUFFIX);
               }
             }
             else if (type.equals(PubSubMessageType.GET_LATEST_TOPICS)) {


[07/11] incubator-apex-core git commit: Calling filter to authenticate kerberos token for pubsub

Posted by th...@apache.org.
Calling filter to authenticate kerberos token for pubsub


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/34b92cb8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/34b92cb8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/34b92cb8

Branch: refs/heads/devel-3
Commit: 34b92cb8cce49444a91f8bc5a272da7222a91b7e
Parents: c0b1178
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Sat Dec 20 22:06:11 2014 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Aug 28 10:56:57 2015 -0700

----------------------------------------------------------------------
 PubSubWebSocketServlet.java | 33 ++++++++++++++++++++++++++++-----
 1 file changed, 28 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/34b92cb8/PubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/PubSubWebSocketServlet.java b/PubSubWebSocketServlet.java
index 7693d92..f628364 100644
--- a/PubSubWebSocketServlet.java
+++ b/PubSubWebSocketServlet.java
@@ -11,7 +11,10 @@ import java.util.Iterator;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 
+import javax.servlet.FilterChain;
 import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
 import javax.servlet.http.Cookie;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -124,6 +127,7 @@ public class PubSubWebSocketServlet extends WebSocketServlet
   @Override
   protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
   {
+    boolean handled = false;
     DTPrincipal principal = null;
     AuthDatabase auth = gateway.getAuthDatabase();
     if (gateway.isDTSessionHandled()) {
@@ -148,13 +152,32 @@ public class PubSubWebSocketServlet extends WebSocketServlet
        throw new WebApplicationException(Status.UNAUTHORIZED);
        */
       //}
-    } else if (gateway.isHadoopAuthFilterHandled()){
-      principal = auth.getUser(request.getUserPrincipal().getName());
+    } else if (gateway.isHadoopAuthFilterHandled()) {
+      final UserHolder userHolder = new UserHolder();
+      gateway.getHadoopAuthFilter().doFilter(request, response, new FilterChain()
+      {
+        @Override
+        public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse) throws IOException, ServletException
+        {
+          userHolder.username = ((HttpServletRequest)servletRequest).getUserPrincipal().getName();
+        }
+      });
+      if (response.getStatus() == HttpServletResponse.SC_OK) {
+        principal = auth.getUser(userHolder.username);
+      } else {
+        handled = true;
+      }
     }
-    if (principal != null) {
-      request.setAttribute(AUTH_ATTRIBUTE, principal);
+    if (!handled) {
+      if (principal != null) {
+        request.setAttribute(AUTH_ATTRIBUTE, principal);
+      }
+      super.service(request, response);
     }
-    super.service(request, response);
+  }
+
+  private class UserHolder {
+    public String username;
   }
 
   @Override