You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/11/01 21:53:24 UTC

svn commit: r1029830 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/...

Author: chirino
Date: Mon Nov  1 20:53:23 2010
New Revision: 1029830

URL: http://svn.apache.org/viewvc?rev=1029830&view=rev
Log:
Protocol handlers can now supply a richer ConnectionStatus dto so that users can see more details about that current status of a connection.

Added:
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/StompConnectionStatusDTO.java
      - copied, changed from r1029829, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/StompConnectionStatusDTO.scaml
      - copied, changed from r1029829, activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectionStatusDTO.scaml
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectionStatusDTO.scaml

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1029830&r1=1029829&r2=1029830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Mon Nov  1 20:53:23 2010
@@ -23,6 +23,7 @@ import protocol.{ProtocolHandler}
 import org.apache.activemq.apollo.transport.{DefaultTransportListener, Transport}
 import org.apache.activemq.apollo.util.{Log, BaseService}
 import org.apache.activemq.apollo.filter.BooleanExpression
+import org.apache.activemq.apollo.dto.ConnectionStatusDTO
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -115,6 +116,27 @@ class BrokerConnection(val connector: Co
     super.onRefill
     protocolHandler.onRefill
   }
+
+  def get_connection_status = {
+    val result = if( protocolHandler==null ) {
+      new ConnectionStatusDTO
+    } else {
+      protocolHandler.create_connection_status
+    }
+
+    result.id = id
+    result.state = serviceState.toString
+    result.state_since = serviceState.since
+    result.protocol = protocolHandler.protocol
+    result.transport = transport.getTypeId
+    result.remote_address = transport.getRemoteAddress
+    val wf = transport.getProtocolCodec
+    if( wf!=null ) {
+      result.write_counter = wf.getWriteCounter
+      result.read_counter = wf.getReadCounter
+    }
+    result
+  }
 }
 
 /**

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala?rev=1029830&r1=1029829&r2=1029830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala Mon Nov  1 20:53:23 2010
@@ -22,6 +22,7 @@ import org.apache.activemq.apollo.util.C
 import org.apache.activemq.apollo.store.MessageRecord
 import org.apache.activemq.apollo.transport._
 import org.apache.activemq.apollo.broker.{Delivery, Message, BrokerConnection}
+import org.apache.activemq.apollo.dto.ConnectionStatusDTO
 
 /**
  * <p>
@@ -80,4 +81,5 @@ trait ProtocolHandler extends DefaultTra
     connection.stop()
   }
 
+  def create_connection_status = new ConnectionStatusDTO
 }

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java?rev=1029830&r1=1029829&r2=1029830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java Mon Nov  1 20:53:23 2010
@@ -46,7 +46,7 @@ public class ConnectionStatusDTO extends
 	public long write_counter;
 
     /**
-     * The protocol the connection is using.
+     * The transport the connection is using.
      */
 	@XmlAttribute
 	public String transport;
@@ -63,9 +63,4 @@ public class ConnectionStatusDTO extends
 	@XmlAttribute(name="remote-address")
 	public String remote_address;
 
-    /**
-     * The connected user
-     */
-	@XmlAttribute
-	public String user;
 }

Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/StompConnectionStatusDTO.java (from r1029829, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/StompConnectionStatusDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/StompConnectionStatusDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java&r1=1029829&r2=1029830&rev=1029830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/StompConnectionStatusDTO.java Mon Nov  1 20:53:23 2010
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.apollo.dto;
 
-import org.codehaus.jackson.annotate.JsonProperty;
-
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
@@ -29,43 +27,33 @@ import javax.xml.bind.annotation.XmlRoot
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-@XmlRootElement(name="connection-status")
+@XmlRootElement(name="stomp-connection-status")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class ConnectionStatusDTO extends ServiceStatusDTO {
-
-    /**
-     * The number of bytes that have been read from the connection.
-     */
-	@XmlAttribute(name="read-counter")
-	public long read_counter;
+public class StompConnectionStatusDTO extends ConnectionStatusDTO {
 
     /**
-     * The number of bytes that have been written to the connection.
+     * The version of the STOMP protocol being used.
      */
-	@XmlAttribute(name="write-counter")
-	public long write_counter;
+	@XmlAttribute(name="protocol-version")
+	public String protocol_version;
 
     /**
-     * The protocol the connection is using.
+     * The connected user
      */
 	@XmlAttribute
-	public String transport;
+	public String user;
 
     /**
-     * The protocol the connection is using.
+     * What the connection is currently waiting on
      */
-	@XmlAttribute
-	public String protocol;
+    @XmlAttribute(name="waiting-on")
+	public String waiting_on;
 
     /**
-     * The remote address of the connection
+     * Opens subscriptions that the connection has created.
      */
-	@XmlAttribute(name="remote-address")
-	public String remote_address;
+    @XmlAttribute(name="subscription-count")
+	public int subscription_count;
+
 
-    /**
-     * The connected user
-     */
-	@XmlAttribute
-	public String user;
 }

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index?rev=1029830&r1=1029829&r2=1029830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index Mon Nov  1 20:53:23 2010
@@ -39,4 +39,5 @@ StringIdLabeledDTO
 StringIdListDTO
 TimeMetricDTO
 VirtualHostDTO
-VirtualHostStatusDTO
\ No newline at end of file
+VirtualHostStatusDTO
+StompConnectionStatusDTO
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1029830&r1=1029829&r2=1029830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Mon Nov  1 20:53:23 2010
@@ -33,9 +33,9 @@ import org.apache.activemq.apollo.filter
 import org.apache.activemq.apollo.transport._
 import org.apache.activemq.apollo.store._
 import org.apache.activemq.apollo.util._
-import org.apache.activemq.apollo.dto.{BindingDTO, DurableSubscriptionBindingDTO, PointToPointBindingDTO}
 import java.util.concurrent.TimeUnit
 import java.util.Map.Entry
+import org.apache.activemq.apollo.dto.{StompConnectionStatusDTO, BindingDTO, DurableSubscriptionBindingDTO, PointToPointBindingDTO}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -176,7 +176,6 @@ class StompProtocolHandler extends Proto
 
   protected def dispatchQueue:DispatchQueue = connection.dispatchQueue
 
-
   trait AckHandler {
     def track(delivery:Delivery):Unit
     def perform_ack(msgid: AsciiBuffer, uow:StoreUOW=null):Unit
@@ -353,15 +352,29 @@ class StompProtocolHandler extends Proto
 
   var session_id:AsciiBuffer = _
   var protocol_version:AsciiBuffer = _
+  var login:Option[AsciiBuffer] = None
+  var passcode:Option[AsciiBuffer] = None
 
   var heart_beat_monitor:HeartBeatMonitor = new HeartBeatMonitor
 
+  var waiting_on:String = "client request"
+
+
+  override def create_connection_status = {
+    var rc = new StompConnectionStatusDTO
+    rc.protocol_version = if( protocol_version == null ) null else protocol_version.toString
+    rc.user = login.map(_.toString).getOrElse(null)
+    rc.subscription_count = consumers.size
+    rc.waiting_on = waiting_on
+    rc
+  }
+
   override def onTransportConnected() = {
 
     session_manager = new SinkMux[StompFrame]( MapSink(connection.transportSink){x=>x}, dispatchQueue, StompFrame)
     connection_sink = new OverflowSink(session_manager.open(dispatchQueue));
     connection_sink.refiller = ^{}
-    connection.transport.resumeRead
+    resumeRead
 
   }
 
@@ -448,8 +461,21 @@ class StompProtocolHandler extends Proto
     }
   }
 
+
+  def suspendRead(reason:String) = {
+    waiting_on = reason
+    connection.transport.suspendRead
+  }
+  def resumeRead() = {
+    waiting_on = "client request"
+    connection.transport.resumeRead
+  }
+
   def on_stomp_connect(headers:HeaderMap):Unit = {
 
+    login = get(headers, LOGIN)
+    passcode = get(headers, PASSCODE)
+
     val accept_versions = get(headers, ACCEPT_VERSION).getOrElse(V1_0).split(COMMA).map(_.ascii)
     protocol_version = SUPPORTED_PROTOCOL_VERSIONS.find( v=> accept_versions.contains(v) ) match {
       case Some(x) => x
@@ -498,8 +524,7 @@ class StompProtocolHandler extends Proto
         return
     }
 
-    connection.transport.suspendRead
-
+    suspendRead("virtual host lookup")
     val host_header = get(headers, HOST)
     val cb: (VirtualHost)=>Unit = (host)=>
       queue {
@@ -520,7 +545,7 @@ class StompProtocolHandler extends Proto
             val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
             wf.memory_pool = this.host.direct_buffer_pool
           }
-          connection.transport.resumeRead
+          resumeRead
 
         } else {
           die("Invalid virtual host: "+host_header.get)
@@ -590,9 +615,9 @@ class StompProtocolHandler extends Proto
         host.router.connect(destiantion, producer) {
           route =>
             if (!connection.stopped) {
-              connection.transport.resumeRead
+              resumeRead
               route.refiller = ^ {
-                connection.transport.resumeRead
+                resumeRead
               }
               producerRoutes.put(destiantion, route)
               send_via_route(route, frame, uow)
@@ -648,7 +673,7 @@ class StompProtocolHandler extends Proto
       if( route.full ) {
         // but once it gets full.. suspend, so that we get more stomp messages
         // until it's not full anymore.
-        connection.transport.suspendRead
+        suspendRead("blocked destination: "+route.destination)
       }
 
     } else {
@@ -838,7 +863,7 @@ class StompProtocolHandler extends Proto
 
   private def _die(headers:HeaderMap, explained:String="") = {
     if( !connection.stopped ) {
-      connection.transport.suspendRead
+      suspendRead("shutdown")
       connection.transport.offer(StompFrame(ERROR, headers, BufferContent(ascii(explained))) )
       // TODO: if there are too many open connections we should just close the connection
       // without waiting for the error to get sent to the client.
@@ -850,7 +875,7 @@ class StompProtocolHandler extends Proto
 
   override def onTransportFailure(error: IOException) = {
     if( !connection.stopped ) {
-      connection.transport.suspendRead
+      suspendRead("shutdown")
       info(error, "Shutting connection down due to: %s", error)
       super.onTransportFailure(error);
     }

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala?rev=1029830&r1=1029829&r2=1029830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala Mon Nov  1 20:53:23 2010
@@ -278,19 +278,7 @@ case class RuntimeResource(parent:Broker
         case None => cb(None)
         case Some(connection:BrokerConnection) =>
           connection.dispatchQueue {
-            val result = new ConnectionStatusDTO
-            result.id = connection.id
-            result.state = connection.serviceState.toString
-            result.state_since = connection.serviceState.since
-            result.protocol = connection.protocolHandler.protocol
-            result.transport = connection.transport.getTypeId
-            result.remote_address = connection.transport.getRemoteAddress
-            val wf = connection.transport.getProtocolCodec
-            if( wf!=null ) {
-              result.write_counter = wf.getWriteCounter
-              result.read_counter = wf.getReadCounter
-            }
-            cb(Some(result))
+            cb(Some(connection.get_connection_status))
           }
       }
     }

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectionStatusDTO.scaml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectionStatusDTO.scaml?rev=1029830&r1=1029829&r2=1029830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectionStatusDTO.scaml (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectionStatusDTO.scaml Mon Nov  1 20:53:23 2010
@@ -18,9 +18,8 @@
 - import helper._
 
 %p state: #{state} for #{ uptime(state_since) }
+%p remote address: #{remote_address}
+%p protocol: #{protocol}
+%p transport: #{transport}
 %p read counter: #{memory(read_counter)}
 %p write counter: #{memory(write_counter)}
-%p transport: #{transport}
-%p protocol: #{protocol}
-%p remote address: #{remote_address}
-%p user: #{user}
\ No newline at end of file

Copied: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/StompConnectionStatusDTO.scaml (from r1029829, activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectionStatusDTO.scaml)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/StompConnectionStatusDTO.scaml?p2=activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/StompConnectionStatusDTO.scaml&p1=activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectionStatusDTO.scaml&r1=1029829&r2=1029830&rev=1029830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectionStatusDTO.scaml (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/StompConnectionStatusDTO.scaml Mon Nov  1 20:53:23 2010
@@ -4,9 +4,9 @@
 -# The ASF licenses this file to You under the Apache License, Version 2.0
 -# (the "License"); you may not use this file except in compliance with
 -# the License.  You may obtain a copy of the License at
--# 
+-#
 -# http://www.apache.org/licenses/LICENSE-2.0
--# 
+-#
 -# Unless required by applicable law or agreed to in writing, software
 -# distributed under the License is distributed on an "AS IS" BASIS,
 -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,9 +18,13 @@
 - import helper._
 
 %p state: #{state} for #{ uptime(state_since) }
+%p remote address: #{remote_address}
+%p protocol: #{protocol}
+%p transport: #{transport}
 %p read counter: #{memory(read_counter)}
 %p write counter: #{memory(write_counter)}
-%p transport: #{transport}
-%p protocol: #{protocol}
-%p remote address: #{remote_address}
-%p user: #{user}
\ No newline at end of file
+
+%p protocol version: #{protocol_version}
+%p user: #{user}
+%p waiting on: #{waiting_on}
+%p subscription count: #{subscription_count}