You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/11/01 21:53:24 UTC
svn commit: r1029830 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/
apollo-dto/src/main/java/org/apache/activemq/apollo/dto/...
Author: chirino
Date: Mon Nov 1 20:53:23 2010
New Revision: 1029830
URL: http://svn.apache.org/viewvc?rev=1029830&view=rev
Log:
Protocol handlers can now supply a richer ConnectionStatus dto so that users can see more details about that current status of a connection.
Added:
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/StompConnectionStatusDTO.java
- copied, changed from r1029829, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/StompConnectionStatusDTO.scaml
- copied, changed from r1029829, activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectionStatusDTO.scaml
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectionStatusDTO.scaml
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1029830&r1=1029829&r2=1029830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Mon Nov 1 20:53:23 2010
@@ -23,6 +23,7 @@ import protocol.{ProtocolHandler}
import org.apache.activemq.apollo.transport.{DefaultTransportListener, Transport}
import org.apache.activemq.apollo.util.{Log, BaseService}
import org.apache.activemq.apollo.filter.BooleanExpression
+import org.apache.activemq.apollo.dto.ConnectionStatusDTO
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -115,6 +116,27 @@ class BrokerConnection(val connector: Co
super.onRefill
protocolHandler.onRefill
}
+
+ def get_connection_status = {
+ val result = if( protocolHandler==null ) {
+ new ConnectionStatusDTO
+ } else {
+ protocolHandler.create_connection_status
+ }
+
+ result.id = id
+ result.state = serviceState.toString
+ result.state_since = serviceState.since
+ result.protocol = protocolHandler.protocol
+ result.transport = transport.getTypeId
+ result.remote_address = transport.getRemoteAddress
+ val wf = transport.getProtocolCodec
+ if( wf!=null ) {
+ result.write_counter = wf.getWriteCounter
+ result.read_counter = wf.getReadCounter
+ }
+ result
+ }
}
/**
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala?rev=1029830&r1=1029829&r2=1029830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ProtocolFactory.scala Mon Nov 1 20:53:23 2010
@@ -22,6 +22,7 @@ import org.apache.activemq.apollo.util.C
import org.apache.activemq.apollo.store.MessageRecord
import org.apache.activemq.apollo.transport._
import org.apache.activemq.apollo.broker.{Delivery, Message, BrokerConnection}
+import org.apache.activemq.apollo.dto.ConnectionStatusDTO
/**
* <p>
@@ -80,4 +81,5 @@ trait ProtocolHandler extends DefaultTra
connection.stop()
}
+ def create_connection_status = new ConnectionStatusDTO
}
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java?rev=1029830&r1=1029829&r2=1029830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java Mon Nov 1 20:53:23 2010
@@ -46,7 +46,7 @@ public class ConnectionStatusDTO extends
public long write_counter;
/**
- * The protocol the connection is using.
+ * The transport the connection is using.
*/
@XmlAttribute
public String transport;
@@ -63,9 +63,4 @@ public class ConnectionStatusDTO extends
@XmlAttribute(name="remote-address")
public String remote_address;
- /**
- * The connected user
- */
- @XmlAttribute
- public String user;
}
Copied: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/StompConnectionStatusDTO.java (from r1029829, activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/StompConnectionStatusDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/StompConnectionStatusDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java&r1=1029829&r2=1029830&rev=1029830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/StompConnectionStatusDTO.java Mon Nov 1 20:53:23 2010
@@ -16,8 +16,6 @@
*/
package org.apache.activemq.apollo.dto;
-import org.codehaus.jackson.annotate.JsonProperty;
-
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
@@ -29,43 +27,33 @@ import javax.xml.bind.annotation.XmlRoot
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-@XmlRootElement(name="connection-status")
+@XmlRootElement(name="stomp-connection-status")
@XmlAccessorType(XmlAccessType.FIELD)
-public class ConnectionStatusDTO extends ServiceStatusDTO {
-
- /**
- * The number of bytes that have been read from the connection.
- */
- @XmlAttribute(name="read-counter")
- public long read_counter;
+public class StompConnectionStatusDTO extends ConnectionStatusDTO {
/**
- * The number of bytes that have been written to the connection.
+ * The version of the STOMP protocol being used.
*/
- @XmlAttribute(name="write-counter")
- public long write_counter;
+ @XmlAttribute(name="protocol-version")
+ public String protocol_version;
/**
- * The protocol the connection is using.
+ * The connected user
*/
@XmlAttribute
- public String transport;
+ public String user;
/**
- * The protocol the connection is using.
+ * What the connection is currently waiting on
*/
- @XmlAttribute
- public String protocol;
+ @XmlAttribute(name="waiting-on")
+ public String waiting_on;
/**
- * The remote address of the connection
+ * Opens subscriptions that the connection has created.
*/
- @XmlAttribute(name="remote-address")
- public String remote_address;
+ @XmlAttribute(name="subscription-count")
+ public int subscription_count;
+
- /**
- * The connected user
- */
- @XmlAttribute
- public String user;
}
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index?rev=1029830&r1=1029829&r2=1029830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index Mon Nov 1 20:53:23 2010
@@ -39,4 +39,5 @@ StringIdLabeledDTO
StringIdListDTO
TimeMetricDTO
VirtualHostDTO
-VirtualHostStatusDTO
\ No newline at end of file
+VirtualHostStatusDTO
+StompConnectionStatusDTO
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1029830&r1=1029829&r2=1029830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Mon Nov 1 20:53:23 2010
@@ -33,9 +33,9 @@ import org.apache.activemq.apollo.filter
import org.apache.activemq.apollo.transport._
import org.apache.activemq.apollo.store._
import org.apache.activemq.apollo.util._
-import org.apache.activemq.apollo.dto.{BindingDTO, DurableSubscriptionBindingDTO, PointToPointBindingDTO}
import java.util.concurrent.TimeUnit
import java.util.Map.Entry
+import org.apache.activemq.apollo.dto.{StompConnectionStatusDTO, BindingDTO, DurableSubscriptionBindingDTO, PointToPointBindingDTO}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -176,7 +176,6 @@ class StompProtocolHandler extends Proto
protected def dispatchQueue:DispatchQueue = connection.dispatchQueue
-
trait AckHandler {
def track(delivery:Delivery):Unit
def perform_ack(msgid: AsciiBuffer, uow:StoreUOW=null):Unit
@@ -353,15 +352,29 @@ class StompProtocolHandler extends Proto
var session_id:AsciiBuffer = _
var protocol_version:AsciiBuffer = _
+ var login:Option[AsciiBuffer] = None
+ var passcode:Option[AsciiBuffer] = None
var heart_beat_monitor:HeartBeatMonitor = new HeartBeatMonitor
+ var waiting_on:String = "client request"
+
+
+ override def create_connection_status = {
+ var rc = new StompConnectionStatusDTO
+ rc.protocol_version = if( protocol_version == null ) null else protocol_version.toString
+ rc.user = login.map(_.toString).getOrElse(null)
+ rc.subscription_count = consumers.size
+ rc.waiting_on = waiting_on
+ rc
+ }
+
override def onTransportConnected() = {
session_manager = new SinkMux[StompFrame]( MapSink(connection.transportSink){x=>x}, dispatchQueue, StompFrame)
connection_sink = new OverflowSink(session_manager.open(dispatchQueue));
connection_sink.refiller = ^{}
- connection.transport.resumeRead
+ resumeRead
}
@@ -448,8 +461,21 @@ class StompProtocolHandler extends Proto
}
}
+
+ def suspendRead(reason:String) = {
+ waiting_on = reason
+ connection.transport.suspendRead
+ }
+ def resumeRead() = {
+ waiting_on = "client request"
+ connection.transport.resumeRead
+ }
+
def on_stomp_connect(headers:HeaderMap):Unit = {
+ login = get(headers, LOGIN)
+ passcode = get(headers, PASSCODE)
+
val accept_versions = get(headers, ACCEPT_VERSION).getOrElse(V1_0).split(COMMA).map(_.ascii)
protocol_version = SUPPORTED_PROTOCOL_VERSIONS.find( v=> accept_versions.contains(v) ) match {
case Some(x) => x
@@ -498,8 +524,7 @@ class StompProtocolHandler extends Proto
return
}
- connection.transport.suspendRead
-
+ suspendRead("virtual host lookup")
val host_header = get(headers, HOST)
val cb: (VirtualHost)=>Unit = (host)=>
queue {
@@ -520,7 +545,7 @@ class StompProtocolHandler extends Proto
val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
wf.memory_pool = this.host.direct_buffer_pool
}
- connection.transport.resumeRead
+ resumeRead
} else {
die("Invalid virtual host: "+host_header.get)
@@ -590,9 +615,9 @@ class StompProtocolHandler extends Proto
host.router.connect(destiantion, producer) {
route =>
if (!connection.stopped) {
- connection.transport.resumeRead
+ resumeRead
route.refiller = ^ {
- connection.transport.resumeRead
+ resumeRead
}
producerRoutes.put(destiantion, route)
send_via_route(route, frame, uow)
@@ -648,7 +673,7 @@ class StompProtocolHandler extends Proto
if( route.full ) {
// but once it gets full.. suspend, so that we get more stomp messages
// until it's not full anymore.
- connection.transport.suspendRead
+ suspendRead("blocked destination: "+route.destination)
}
} else {
@@ -838,7 +863,7 @@ class StompProtocolHandler extends Proto
private def _die(headers:HeaderMap, explained:String="") = {
if( !connection.stopped ) {
- connection.transport.suspendRead
+ suspendRead("shutdown")
connection.transport.offer(StompFrame(ERROR, headers, BufferContent(ascii(explained))) )
// TODO: if there are too many open connections we should just close the connection
// without waiting for the error to get sent to the client.
@@ -850,7 +875,7 @@ class StompProtocolHandler extends Proto
override def onTransportFailure(error: IOException) = {
if( !connection.stopped ) {
- connection.transport.suspendRead
+ suspendRead("shutdown")
info(error, "Shutting connection down due to: %s", error)
super.onTransportFailure(error);
}
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala?rev=1029830&r1=1029829&r2=1029830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala Mon Nov 1 20:53:23 2010
@@ -278,19 +278,7 @@ case class RuntimeResource(parent:Broker
case None => cb(None)
case Some(connection:BrokerConnection) =>
connection.dispatchQueue {
- val result = new ConnectionStatusDTO
- result.id = connection.id
- result.state = connection.serviceState.toString
- result.state_since = connection.serviceState.since
- result.protocol = connection.protocolHandler.protocol
- result.transport = connection.transport.getTypeId
- result.remote_address = connection.transport.getRemoteAddress
- val wf = connection.transport.getProtocolCodec
- if( wf!=null ) {
- result.write_counter = wf.getWriteCounter
- result.read_counter = wf.getReadCounter
- }
- cb(Some(result))
+ cb(Some(connection.get_connection_status))
}
}
}
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectionStatusDTO.scaml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectionStatusDTO.scaml?rev=1029830&r1=1029829&r2=1029830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectionStatusDTO.scaml (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectionStatusDTO.scaml Mon Nov 1 20:53:23 2010
@@ -18,9 +18,8 @@
- import helper._
%p state: #{state} for #{ uptime(state_since) }
+%p remote address: #{remote_address}
+%p protocol: #{protocol}
+%p transport: #{transport}
%p read counter: #{memory(read_counter)}
%p write counter: #{memory(write_counter)}
-%p transport: #{transport}
-%p protocol: #{protocol}
-%p remote address: #{remote_address}
-%p user: #{user}
\ No newline at end of file
Copied: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/StompConnectionStatusDTO.scaml (from r1029829, activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectionStatusDTO.scaml)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/StompConnectionStatusDTO.scaml?p2=activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/StompConnectionStatusDTO.scaml&p1=activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectionStatusDTO.scaml&r1=1029829&r2=1029830&rev=1029830&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/ConnectionStatusDTO.scaml (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/StompConnectionStatusDTO.scaml Mon Nov 1 20:53:23 2010
@@ -4,9 +4,9 @@
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
--#
+-#
-# http://www.apache.org/licenses/LICENSE-2.0
--#
+-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,9 +18,13 @@
- import helper._
%p state: #{state} for #{ uptime(state_since) }
+%p remote address: #{remote_address}
+%p protocol: #{protocol}
+%p transport: #{transport}
%p read counter: #{memory(read_counter)}
%p write counter: #{memory(write_counter)}
-%p transport: #{transport}
-%p protocol: #{protocol}
-%p remote address: #{remote_address}
-%p user: #{user}
\ No newline at end of file
+
+%p protocol version: #{protocol_version}
+%p user: #{user}
+%p waiting on: #{waiting_on}
+%p subscription count: #{subscription_count}