You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/07/11 06:10:54 UTC

svn commit: r420704 - in /incubator/activemq/branches/activemq-4.0/activemq-optional/src/main/java/org/apache/activemq/transport/http: HttpClientTransport.java HttpTransportFactory.java HttpTunnelServlet.java

Author: chirino
Date: Mon Jul 10 21:10:53 2006
New Revision: 420704

URL: http://svn.apache.org/viewvc?rev=420704&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQ-806
and
http://issues.apache.org/activemq/browse/AMQ-807

Modified:
    incubator/activemq/branches/activemq-4.0/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
    incubator/activemq/branches/activemq-4.0/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java
    incubator/activemq/branches/activemq-4.0/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java

Modified: incubator/activemq/branches/activemq-4.0/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java?rev=420704&r1=420703&r2=420704&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java Mon Jul 10 21:10:53 2006
@@ -16,27 +16,27 @@
  */
 package org.apache.activemq.transport.http;
 
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.URI;
+
 import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.transport.FutureResponse;
 import org.apache.activemq.transport.util.TextWireFormat;
 import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.ServiceStopper;
-import org.apache.commons.httpclient.Header;
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.HttpMethod;
 import org.apache.commons.httpclient.HttpStatus;
 import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.methods.HeadMethod;
 import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URI;
-
 /**
  * A HTTP {@link org.apache.activemq.transport.TransportChannel} which uses the <a
  * href="http://jakarta.apache.org/commons/httpclient/">commons-httpclient</a>
@@ -46,14 +46,16 @@
  */
 public class HttpClientTransport extends HttpTransportSupport {
     private static final Log log = LogFactory.getLog(HttpClientTransport.class);
-
     public static final int MAX_CLIENT_TIMEOUT = 30000;
 
+    private static final IdGenerator clientIdGenerator = new IdGenerator();
+
     private HttpClient sendHttpClient;
     private HttpClient receiveHttpClient;
-    private String clientID;
-//    private String sessionID;
-
+    
+    private final String clientID = clientIdGenerator.generateId();
+    private boolean trace;
+    
     public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) {
         super(wireFormat, remoteUrl);
     }
@@ -63,19 +65,23 @@
     }
 
     public void oneway(Command command) throws IOException {
-        if (command.getDataStructureType() == ConnectionInfo.DATA_STRUCTURE_TYPE)
-            clientID = ((ConnectionInfo) command).getClientId();
-
+    	
+    	if( isStopped() ) {
+    		throw new IOException("stopped.");
+    	}
+    	
         PostMethod httpMethod = new PostMethod(getRemoteUrl().toString());
         configureMethod(httpMethod);
         httpMethod.setRequestBody(getTextWireFormat().toString(command));
         try {
+        	
             HttpClient client = getSendHttpClient();
             client.setTimeout(MAX_CLIENT_TIMEOUT);
             int answer = client.executeMethod(httpMethod);
             if (answer != HttpStatus.SC_OK) {
                 throw new IOException("Failed to post command: " + command + " as response was: " + answer);
             }
+                        
 //            checkSession(httpMethod);
         } catch (IOException e) {
             throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e);
@@ -90,10 +96,12 @@
     }
 
     public void run() {
+    	
         log.trace("HTTP GET consumer thread starting: " + this);
         HttpClient httpClient = getReceiveHttpClient();
         URI remoteUrl = getRemoteUrl();
-        while (!isStopped()) {
+                
+        while ( !isStopped() && !isStopping() ) {
 
             GetMethod httpMethod = new GetMethod(remoteUrl.toString());
             configureMethod(httpMethod);
@@ -102,25 +110,34 @@
                 int answer = httpClient.executeMethod(httpMethod);
                 if (answer != HttpStatus.SC_OK) {
                     if (answer == HttpStatus.SC_REQUEST_TIMEOUT) {
-                        log.info("GET timed out");
+                        log.debug("GET timed out");
+                        try {
+							Thread.sleep(1000);
+						} catch (InterruptedException e) {
+							onException(new InterruptedIOException());
+							break;
+						}
                     }
                     else {
-                        log.warn("Failed to perform GET on: " + remoteUrl + " as response was: " + answer);
+						onException(new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + answer));
+						break;
                     }
                 }
                 else {
 //                    checkSession(httpMethod);
-                    Command command = getTextWireFormat().readCommand(new DataInputStream(httpMethod.getResponseBodyAsStream()));
+                	DataInputStream stream = new DataInputStream(httpMethod.getResponseBodyAsStream());
+                    
+                	Command command = getTextWireFormat().readCommand(stream);                    
                     if (command == null) {
                         log.warn("Received null command from url: " + remoteUrl);
-                    }
-                    else {
+                    } else {
                         doConsume(command);
                     }
                 }
             }
             catch (IOException e) {
-                log.warn("Failed to perform GET on: " + remoteUrl + " due to: " + e, e);
+				onException(IOExceptionSupport.create("Failed to perform GET on: " + remoteUrl+" Reason: "+e.getMessage(),e));
+				break;
             } finally {
                 httpMethod.getResponseBody();
                 httpMethod.releaseConnection();
@@ -154,8 +171,24 @@
 
     // Implementation methods
     // -------------------------------------------------------------------------
+    protected void doStart() throws Exception {
+    	
+        log.trace("HTTP GET consumer thread starting: " + this);
+        HttpClient httpClient = getReceiveHttpClient();
+        URI remoteUrl = getRemoteUrl();
+                
+        HeadMethod httpMethod = new HeadMethod(remoteUrl.toString());
+        configureMethod(httpMethod);
+
+        int answer = httpClient.executeMethod(httpMethod);
+        if (answer != HttpStatus.SC_OK) {
+			throw new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + answer);
+        }
+    	
+    	super.doStart();
+    }
+    
     protected void doStop(ServiceStopper stopper) throws Exception {
-        // TODO
     }
 
     protected HttpClient createHttpClient() {
@@ -163,14 +196,16 @@
     }
 
     protected void configureMethod(HttpMethod method) {
-//        if (sessionID != null) {
-//            method.addRequestHeader("Cookie", "JSESSIONID=" + sessionID);
-//        }
-//        else
-          if (clientID != null) {
-            method.setRequestHeader("clientID", clientID);
-        }
+        method.setRequestHeader("clientID", clientID);
     }
+
+	public boolean isTrace() {
+		return trace;
+	}
+
+	public void setTrace(boolean trace) {
+		this.trace = trace;
+	}
 
 //    protected void checkSession(HttpMethod client) {
 //        Header header = client.getRequestHeader("Set-Cookie");

Modified: incubator/activemq/branches/activemq-4.0/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java?rev=420704&r1=420703&r2=420704&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java Mon Jul 10 21:10:53 2006
@@ -17,12 +17,13 @@
 package org.apache.activemq.transport.http;
 
 import java.io.IOException;
-import java.net.MalformedURLException;
 import java.net.URI;
+import java.util.Map;
 
 import org.apache.activeio.command.WireFormat;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportLogger;
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.util.TextWireFormat;
 import org.apache.activemq.transport.xstream.XStreamWireFormat;
@@ -51,10 +52,18 @@
         return "xstream";
     }
 
-    protected Transport createTransport(URI location, WireFormat wf) throws MalformedURLException {
-        TextWireFormat textWireFormat = asTextWireFormat(wf);
-        Transport transport = new HttpClientTransport(textWireFormat, location);
-        return transport;
+    protected Transport createTransport(URI location, WireFormat wf) throws IOException {
+		TextWireFormat textWireFormat = asTextWireFormat(wf);
+		return new HttpClientTransport(textWireFormat, location);
+    }
+    
+    public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+    	HttpClientTransport httpTransport = (HttpClientTransport) super.compositeConfigure(transport, format, options);
+		transport = httpTransport;
+    	if( httpTransport.isTrace() ) {
+			transport = new TransportLogger(httpTransport);
+    	}
+		return transport;
     }
 
 }

Modified: incubator/activemq/branches/activemq-4.0/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java?rev=420704&r1=420703&r2=420704&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java Mon Jul 10 21:10:53 2006
@@ -26,10 +26,8 @@
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-import javax.servlet.http.HttpSession;
 
 import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.transport.TransportAcceptListener;
 import org.apache.activemq.transport.util.TextWireFormat;
@@ -67,30 +65,34 @@
             wireFormat = createWireFormat();
         }
     }
-
+    
+    protected void doHead(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
+        createTransportChannel(request, response);
+    }
+    
     protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
         // lets return the next response
         Command packet = null;
+        int count=0;
         try {
-            BlockingQueueTransport transportChannel = getTransportChannel(request);
-            if (transportChannel == null) {
-                response.sendError(HttpServletResponse.SC_BAD_REQUEST, "clientID not specified.");
-                log("No transport available! ");
+            BlockingQueueTransport transportChannel = getTransportChannel(request, response);
+            if (transportChannel == null)
                 return;
-            }
+            
             packet = (Command) transportChannel.getQueue().poll(requestTimeout, TimeUnit.MILLISECONDS);
+            
+            DataOutputStream stream = new DataOutputStream(response.getOutputStream());
+//            while( packet !=null ) {
+            	wireFormat.marshal(packet, stream);
+            	count++;
+//            	packet = (Command) transportChannel.getQueue().poll(0, TimeUnit.MILLISECONDS);
+//            }
+
+        } catch (InterruptedException ignore) {
         }
-        catch (InterruptedException e) {
-            // ignore
-        }
-        if (packet == null) {
-            // TODO temporary hack to prevent busy loop.  Replace with continuations
-            try{ Thread.sleep(250);}catch (InterruptedException e) { e.printStackTrace(); }
+        if (count == 0) {
             response.setStatus(HttpServletResponse.SC_REQUEST_TIMEOUT);
         }
-        else {
-            wireFormat.marshal(packet, new DataOutputStream(response.getOutputStream()));
-        }
     }
 
     protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
@@ -104,20 +106,13 @@
                 response.sendError(HttpServletResponse.SC_NOT_FOUND, "Cannot process wire format of version: " + info.getVersion());
             }
 
-        }
-        else {
-            if (command instanceof ConnectionInfo) {
-                ConnectionInfo info = (ConnectionInfo) command;
-                request.getSession(true).setAttribute("clientID", info.getClientId());
-            }
+        } else {
 
-            BlockingQueueTransport transport = getTransportChannel(request);
-            if (transport == null) {
-                response.setStatus(HttpServletResponse.SC_NOT_FOUND);
-            }
-            else {
-                transport.doConsume(command);
-            }
+            BlockingQueueTransport transport = getTransportChannel(request, response);
+            if (transport == null)
+                return;
+            
+            transport.doConsume(command);
         }
     }
 
@@ -142,39 +137,43 @@
         return buffer.toString();
     }
 
-    protected BlockingQueueTransport getTransportChannel(HttpServletRequest request) {
-        HttpSession session = request.getSession(true);
-        String clientID = null;
-        if (session != null) {
-            clientID = (String) session.getAttribute("clientID");
-        }
-        if (clientID == null) {
-            clientID = request.getHeader("clientID");
-        }
-        /**
-         * if (clientID == null) { clientID = request.getParameter("clientID"); }
-         */
+    protected BlockingQueueTransport getTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException {
+        String clientID = request.getHeader("clientID");
         if (clientID == null) {
-            log.warn("No clientID header so ignoring request");
+            response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No clientID header specified");
+            log.warn("No clientID header specified");
             return null;
         }
         synchronized (this) {
             BlockingQueueTransport answer = (BlockingQueueTransport) clients.get(clientID);
             if (answer == null) {
-                answer = createTransportChannel();
-                clients.put(clientID, answer);
-                listener.onAccept(answer);
-            }
-            else {
-                /*
-                try {
-                    answer.oneway(ping);
-                }
-                catch (IOException e) {
-                    log.warn("Failed to ping transport: " + e, e);
-                }
-                */
+                log.warn("The clientID header specified is invalid. Client sesion has not yet been established for it: "+clientID);
+                return null;
             }
+            return answer;
+        }
+    }
+    
+    protected BlockingQueueTransport createTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException {
+        String clientID = request.getHeader("clientID");
+        
+        if (clientID == null) {
+            response.sendError(HttpServletResponse.SC_BAD_REQUEST, "No clientID header specified");
+            log.warn("No clientID header specified");
+            return null;
+        }
+        
+        synchronized (this) {
+            BlockingQueueTransport answer = (BlockingQueueTransport) clients.get(clientID);
+            if (answer != null) {
+                response.sendError(HttpServletResponse.SC_BAD_REQUEST, "A session for clientID '"+clientID+"' has allready been established");
+                log.warn("A session for clientID '"+clientID+"' has allready been established");
+                return null;
+            }
+            
+            answer = createTransportChannel();
+            clients.put(clientID, answer);
+            listener.onAccept(answer);            
             return answer;
         }
     }