You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/07/11 06:12:30 UTC
svn commit: r420705 - in
/incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http:
HttpClientTransport.java HttpTransportFactory.java HttpTunnelServlet.java
Author: chirino
Date: Mon Jul 10 21:12:28 2006
New Revision: 420705
URL: http://svn.apache.org/viewvc?rev=420705&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQ-806
and
http://issues.apache.org/activemq/browse/AMQ-807
Modified:
incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java
incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
Modified: incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java?rev=420705&r1=420704&r2=420705&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java (original)
+++ incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java Mon Jul 10 21:12:28 2006
@@ -16,27 +16,27 @@
*/
package org.apache.activemq.transport.http;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.URI;
+
import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.ServiceStopper;
-import org.apache.commons.httpclient.Header;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpMethod;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.methods.HeadMethod;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URI;
-
/**
* A HTTP {@link org.apache.activemq.transport.TransportChannel} which uses the <a
* href="http://jakarta.apache.org/commons/httpclient/">commons-httpclient</a>
@@ -46,14 +46,16 @@
*/
public class HttpClientTransport extends HttpTransportSupport {
private static final Log log = LogFactory.getLog(HttpClientTransport.class);
-
public static final int MAX_CLIENT_TIMEOUT = 30000;
+ private static final IdGenerator clientIdGenerator = new IdGenerator();
+
private HttpClient sendHttpClient;
private HttpClient receiveHttpClient;
- private String clientID;
-// private String sessionID;
-
+
+ private final String clientID = clientIdGenerator.generateId();
+ private boolean trace;
+
public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) {
super(wireFormat, remoteUrl);
}
@@ -63,19 +65,23 @@
}
public void oneway(Command command) throws IOException {
- if (command.getDataStructureType() == ConnectionInfo.DATA_STRUCTURE_TYPE)
- clientID = ((ConnectionInfo) command).getClientId();
-
+
+ if( isStopped() ) {
+ throw new IOException("stopped.");
+ }
+
PostMethod httpMethod = new PostMethod(getRemoteUrl().toString());
configureMethod(httpMethod);
httpMethod.setRequestBody(getTextWireFormat().toString(command));
try {
+
HttpClient client = getSendHttpClient();
client.setTimeout(MAX_CLIENT_TIMEOUT);
int answer = client.executeMethod(httpMethod);
if (answer != HttpStatus.SC_OK) {
throw new IOException("Failed to post command: " + command + " as response was: " + answer);
}
+
// checkSession(httpMethod);
} catch (IOException e) {
throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e);
@@ -90,10 +96,12 @@
}
public void run() {
+
log.trace("HTTP GET consumer thread starting: " + this);
HttpClient httpClient = getReceiveHttpClient();
URI remoteUrl = getRemoteUrl();
- while (!isStopped()) {
+
+ while ( !isStopped() && !isStopping() ) {
GetMethod httpMethod = new GetMethod(remoteUrl.toString());
configureMethod(httpMethod);
@@ -102,25 +110,34 @@
int answer = httpClient.executeMethod(httpMethod);
if (answer != HttpStatus.SC_OK) {
if (answer == HttpStatus.SC_REQUEST_TIMEOUT) {
- log.info("GET timed out");
+ log.debug("GET timed out");
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ onException(new InterruptedIOException());
+ break;
+ }
}
else {
- log.warn("Failed to perform GET on: " + remoteUrl + " as response was: " + answer);
+ onException(new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + answer));
+ break;
}
}
else {
// checkSession(httpMethod);
- Command command = getTextWireFormat().readCommand(new DataInputStream(httpMethod.getResponseBodyAsStream()));
+ DataInputStream stream = new DataInputStream(httpMethod.getResponseBodyAsStream());
+
+ Command command = getTextWireFormat().readCommand(stream);
if (command == null) {
log.warn("Received null command from url: " + remoteUrl);
- }
- else {
+ } else {
doConsume(command);
}
}
}
catch (IOException e) {
- log.warn("Failed to perform GET on: " + remoteUrl + " due to: " + e, e);
+ onException(IOExceptionSupport.create("Failed to perform GET on: " + remoteUrl+" Reason: "+e.getMessage(),e));
+ break;
} finally {
httpMethod.getResponseBody();
httpMethod.releaseConnection();
@@ -154,8 +171,24 @@
// Implementation methods
// -------------------------------------------------------------------------
+ protected void doStart() throws Exception {
+
+ log.trace("HTTP GET consumer thread starting: " + this);
+ HttpClient httpClient = getReceiveHttpClient();
+ URI remoteUrl = getRemoteUrl();
+
+ HeadMethod httpMethod = new HeadMethod(remoteUrl.toString());
+ configureMethod(httpMethod);
+
+ int answer = httpClient.executeMethod(httpMethod);
+ if (answer != HttpStatus.SC_OK) {
+ throw new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + answer);
+ }
+
+ super.doStart();
+ }
+
protected void doStop(ServiceStopper stopper) throws Exception {
- // TODO
}
protected HttpClient createHttpClient() {
@@ -163,14 +196,16 @@
}
protected void configureMethod(HttpMethod method) {
-// if (sessionID != null) {
-// method.addRequestHeader("Cookie", "JSESSIONID=" + sessionID);
-// }
-// else
- if (clientID != null) {
- method.setRequestHeader("clientID", clientID);
- }
+ method.setRequestHeader("clientID", clientID);
}
+
+ public boolean isTrace() {
+ return trace;
+ }
+
+ public void setTrace(boolean trace) {
+ this.trace = trace;
+ }
// protected void checkSession(HttpMethod client) {
// Header header = client.getRequestHeader("Set-Cookie");
Modified: incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java?rev=420705&r1=420704&r2=420705&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java Mon Jul 10 21:12:28 2006
@@ -17,12 +17,13 @@
package org.apache.activemq.transport.http;
import java.io.IOException;
-import java.net.MalformedURLException;
import java.net.URI;
+import java.util.Map;
import org.apache.activeio.command.WireFormat;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportLogger;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.activemq.transport.xstream.XStreamWireFormat;
@@ -51,10 +52,18 @@
return "xstream";
}
- protected Transport createTransport(URI location, WireFormat wf) throws MalformedURLException {
- TextWireFormat textWireFormat = asTextWireFormat(wf);
- Transport transport = new HttpClientTransport(textWireFormat, location);
- return transport;
+ protected Transport createTransport(URI location, WireFormat wf) throws IOException {
+ TextWireFormat textWireFormat = asTextWireFormat(wf);
+ return new HttpClientTransport(textWireFormat, location);
+ }
+
+ public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+ HttpClientTransport httpTransport = (HttpClientTransport) super.compositeConfigure(transport, format, options);
+ transport = httpTransport;
+ if( httpTransport.isTrace() ) {
+ transport = new TransportLogger(httpTransport);
+ }
+ return transport;
}
}
Modified: incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java?rev=420705&r1=420704&r2=420705&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java (original)
+++ incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java Mon Jul 10 21:12:28 2006
@@ -26,10 +26,8 @@
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import javax.servlet.http.HttpSession;
import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.util.TextWireFormat;
@@ -67,30 +65,34 @@
wireFormat = createWireFormat();
}
}
-
+
+ protected void doHead(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
+ createTransportChannel(request, response);
+ }
+
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
// lets return the next response
Command packet = null;
+ int count=0;
try {
- BlockingQueueTransport transportChannel = getTransportChannel(request);
- if (transportChannel == null) {
- response.sendError(HttpServletResponse.SC_BAD_REQUEST, "clientID not specified.");
- log("No transport available! ");
+ BlockingQueueTransport transportChannel = getTransportChannel(request, response);
+ if (transportChannel == null)
return;
- }
+
packet = (Command) transportChannel.getQueue().poll(requestTimeout, TimeUnit.MILLISECONDS);
+
+ DataOutputStream stream = new DataOutputStream(response.getOutputStream());
+// while( packet !=null ) {
+ wireFormat.marshal(packet, stream);
+ count++;
+// packet = (Command) transportChannel.getQueue().poll(0, TimeUnit.MILLISECONDS);
+// }
+
+ } catch (InterruptedException ignore) {
}
- catch (InterruptedException e) {
- // ignore
- }
- if (packet == null) {
- // TODO temporary hack to prevent busy loop. Replace with continuations
- try{ Thread.sleep(250);}catch (InterruptedException e) { e.printStackTrace(); }
+ if (count == 0) {
response.setStatus(HttpServletResponse.SC_REQUEST_TIMEOUT);
}
- else {
- wireFormat.marshal(packet, new DataOutputStream(response.getOutputStream()));
- }
}
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
@@ -104,20 +106,13 @@
response.sendError(HttpServletResponse.SC_NOT_FOUND, "Cannot process wire format of version: " + info.getVersion());
}
- }
- else {
- if (command instanceof ConnectionInfo) {
- ConnectionInfo info = (ConnectionInfo) command;
- request.getSession(true).setAttribute("clientID", info.getClientId());
- }
+ } else {
- BlockingQueueTransport transport = getTransportChannel(request);
- if (transport == null) {
- response.setStatus(HttpServletResponse.SC_NOT_FOUND);
- }
- else {
- transport.doConsume(command);
- }
+ BlockingQueueTransport transport = getTransportChannel(request, response);
+ if (transport == null)
+ return;
+
+ transport.doConsume(command);
}
}
@@ -142,39 +137,43 @@
return buffer.toString();
}
- protected BlockingQueueTransport getTransportChannel(HttpServletRequest request) {
- HttpSession session = request.getSession(true);
- String clientID = null;
- if (session != null) {
- clientID = (String) session.getAttribute("clientID");
- }
- if (clientID == null) {
- clientID = request.getHeader("clientID");
- }
- /**
- * if (clientID == null) { clientID = request.getParameter("clientID"); }
- */
+ protected BlockingQueueTransport getTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException {
+ String clientID = request.getHeader("clientID");
if (clientID == null) {
- log.warn("No clientID header so ignoring request");
+ response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No clientID header specified");
+ log.warn("No clientID header specified");
return null;
}
synchronized (this) {
BlockingQueueTransport answer = (BlockingQueueTransport) clients.get(clientID);
if (answer == null) {
- answer = createTransportChannel();
- clients.put(clientID, answer);
- listener.onAccept(answer);
- }
- else {
- /*
- try {
- answer.oneway(ping);
- }
- catch (IOException e) {
- log.warn("Failed to ping transport: " + e, e);
- }
- */
+ log.warn("The clientID header specified is invalid. Client sesion has not yet been established for it: "+clientID);
+ return null;
}
+ return answer;
+ }
+ }
+
+ protected BlockingQueueTransport createTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException {
+ String clientID = request.getHeader("clientID");
+
+ if (clientID == null) {
+ response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No clientID header specified");
+ log.warn("No clientID header specified");
+ return null;
+ }
+
+ synchronized (this) {
+ BlockingQueueTransport answer = (BlockingQueueTransport) clients.get(clientID);
+ if (answer != null) {
+ response.sendError(HttpServletResponse.SC_BAD_REQUEST, "A session for clientID '"+clientID+"' has allready been established");
+ log.warn("A session for clientID '"+clientID+"' has allready been established");
+ return null;
+ }
+
+ answer = createTransportChannel();
+ clients.put(clientID, answer);
+ listener.onAccept(answer);
return answer;
}
}