You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/04 21:02:02 UTC

svn commit: r771400 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: net/http/HttpConnection.java net/http/HttpRequest.java net/http/HttpRequestHandler.java service/HttpRequestVerbHandler.java service/StorageService.java

Author: jbellis
Date: Mon May  4 19:02:01 2009
New Revision: 771400

URL: http://svn.apache.org/viewvc?rev=771400&view=rev
Log:
refactor HttpConnection to use its own executor instead of abusing MessagingService.  This will let us refactor Message body to a byte[].
patch by jbellis; reviewed by nk11 for CASSANDRA-120

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/http/HttpRequestHandler.java
      - copied, changed from r771399, incubator/cassandra/trunk/src/java/org/apache/cassandra/service/HttpRequestVerbHandler.java
Removed:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/HttpRequestVerbHandler.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/http/HttpConnection.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/http/HttpRequest.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/http/HttpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/http/HttpConnection.java?rev=771400&r1=771399&r2=771400&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/http/HttpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/http/HttpConnection.java Mon May  4 19:02:01 2009
@@ -25,22 +25,17 @@
 package org.apache.cassandra.net.http;
 
 import java.util.*;
-import java.net.*;
-import java.io.*;
-import java.nio.*;
-import java.nio.channels.SelectionKey;
+import java.util.concurrent.ExecutorService;
+import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
-import org.apache.cassandra.service.*;
-import org.apache.cassandra.concurrent.SingleThreadedStage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
+import java.nio.channels.SelectionKey;
+import java.io.IOException;
+
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.SelectionKeyHandler;
-import org.apache.cassandra.net.SelectorManager;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.log4j.Logger;
 
 /**
@@ -53,13 +48,15 @@
     public static final String httpRequestVerbHandler_ = "HTTP-REQUEST-VERB-HANDLER";
     public static final String httpStage_ = "HTTP-STAGE";
 
+    private static ExecutorService executor_ = new DebuggableThreadPoolExecutor("HTTP-CONNECTION");
+
     /*
      * These are the callbacks into who ever intends
      * to listen on the client socket.
      */
     public interface HttpConnectionListener
     {
-        public void onRequest(HttpRequest httpRequest);
+        public void onRequest(org.apache.cassandra.net.http.HttpRequest httpRequest);
         public void onResponse(HttpResponse httpResponse);
     }
 
@@ -84,7 +81,7 @@
     private List<ByteBuffer> bodyBuffers_ = new LinkedList<ByteBuffer>();
     private boolean shouldClose_ = false;
     private String defaultContentType_ = "text/html";
-    private HttpRequest currentRequest_ = null;
+    private org.apache.cassandra.net.http.HttpRequest currentRequest_ = null;
     private HttpResponse currentResponse_ = null;
     private HttpStartLineParser startLineParser_ = new HttpStartLineParser(this);
     private HttpHeaderParser headerParser_ = new HttpHeaderParser(this);
@@ -126,28 +123,6 @@
         }
     }
 
-    public static class HttpRequestMessage
-    {
-        private HttpRequest httpRequest_;
-        private HttpConnection httpConnection_;
-
-        HttpRequestMessage(HttpRequest httpRequest, HttpConnection httpConnection)
-        {
-            httpRequest_ = httpRequest;
-            httpConnection_ = httpConnection;
-        }
-
-        public HttpRequest getHttpRequest()
-        {
-            return httpRequest_;
-        }
-
-        public HttpConnection getHttpConnection()
-        {
-            return httpConnection_;
-        }
-    }
-
     /*
      *  Read called on the Selector thread. This is called
      *  when there is some HTTP request that needs to be
@@ -333,7 +308,7 @@
                             currentRequest_.addHeader("Content-Type", defaultContentType_);
                         }
 
-                        handleRequest(currentRequest_);
+                        executor_.submit(new HttpRequestHandler(currentRequest_));
                     }
                     else if (currentMsgType_ == HttpMessageType.RESPONSE)
                     {
@@ -380,13 +355,6 @@
         }
     }
 
-    private void handleRequest(HttpRequest request)
-    {
-        HttpConnection.HttpRequestMessage httpRequestMessage = new HttpConnection.HttpRequestMessage(request, this);
-        Message httpMessage = new Message(null, HttpConnection.httpStage_, HttpConnection.httpRequestVerbHandler_, new Object[]{httpRequestMessage});
-        MessagingService.receive(httpMessage);
-    }
-
     // HttpStartLineParser.Callback interface implementation
     public void onStartLine(String method, String path, String query, String version)
     {
@@ -403,7 +371,7 @@
         {
                 // request
                 currentMsgType_ = HttpMessageType.REQUEST;
-                currentRequest_ = new HttpRequest();
+                currentRequest_ = new HttpRequest(this);
                 currentRequest_.setStartLine(method, path, query, version);
         }
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/http/HttpRequest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/http/HttpRequest.java?rev=771400&r1=771399&r2=771400&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/http/HttpRequest.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/http/HttpRequest.java Mon May  4 19:02:01 2009
@@ -22,6 +22,8 @@
 
 package org.apache.cassandra.net.http;
 
+import org.apache.zookeeper.data.StatPersisted;
+
 import java.util.*;
 import java.io.*;
 import java.net.URLDecoder;
@@ -40,6 +42,12 @@
     private String path_;
     private String query_;
     private String version_;
+    private HttpConnection connection_;
+
+    public HttpRequest(HttpConnection connection)
+    {
+        this.connection_ = connection;
+    }
 
     /*
      * Returns the type of method - GET, POST, etc.
@@ -206,5 +214,10 @@
 
         return sw.toString();
     }
+
+    public HttpConnection getHttpConnection()
+    {
+        return connection_;
+    }
 }
 

Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/http/HttpRequestHandler.java (from r771399, incubator/cassandra/trunk/src/java/org/apache/cassandra/service/HttpRequestVerbHandler.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/http/HttpRequestHandler.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/net/http/HttpRequestHandler.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/service/HttpRequestVerbHandler.java&r1=771399&r2=771400&rev=771400&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/HttpRequestVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/http/HttpRequestHandler.java Mon May  4 19:02:01 2009
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.service;
+package org.apache.cassandra.net.http;
 
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
@@ -41,25 +41,24 @@
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.http.ColumnFamilyFormatter;
 import org.apache.cassandra.net.http.HTMLFormatter;
 import org.apache.cassandra.net.http.HttpConnection;
-import org.apache.cassandra.net.http.HttpRequest;
 import org.apache.cassandra.net.http.HttpWriteResponse;
 import org.apache.cassandra.procedures.GroovyScriptRunner;
 import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.service.StorageService;
 
 /*
  * This class handles the incoming HTTP request after
  * it has been parsed.
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
  */
-public class HttpRequestVerbHandler implements IVerbHandler
+public class HttpRequestHandler implements Runnable
 {
-    private static final Logger logger_ = Logger.getLogger(HttpRequestVerbHandler.class);
+    private static final Logger logger_ = Logger.getLogger(HttpRequestHandler.class);
     /* These are the list of actions supported */
     private static final String DETAILS = "details";
     private static final String LOADME = "loadme";
@@ -77,41 +76,42 @@
     private static final String JS_UPDATE_INSERT_FUNCTION = "updateInsertResults";
 
     private StorageService storageService_;
+    private HttpRequest request_;
 
-    public HttpRequestVerbHandler(StorageService storageService)
+    public HttpRequestHandler(HttpRequest request)
     {
-        storageService_ = storageService;
+        request_ = request;
+        storageService_ = StorageService.instance();
     }
 
-    public void doVerb(Message message)
+    public void run()
     {
-        HttpConnection.HttpRequestMessage httpRequestMessage = (HttpConnection.HttpRequestMessage)message.getMessageBody()[0];
-        try
+        HttpWriteResponse httpServerResponse = new HttpWriteResponse(request_);
+        if(request_.getMethod().toUpperCase().equals("GET"))
         {
-            HttpRequest httpRequest = httpRequestMessage.getHttpRequest();
-            HttpWriteResponse httpServerResponse = new HttpWriteResponse(httpRequest);
-            if(httpRequest.getMethod().toUpperCase().equals("GET"))
-            {
-                // handle the get request type
-                doGet(httpRequest, httpServerResponse);
-            }
-            else if(httpRequest.getMethod().toUpperCase().equals("POST"))
-            {
-                // handle the POST request type
-                doPost(httpRequest, httpServerResponse);
-            }
+            // handle the get request type
+            doGet(request_, httpServerResponse);
+        }
+        else if(request_.getMethod().toUpperCase().equals("POST"))
+        {
+            // handle the POST request type
+            doPost(request_, httpServerResponse);
+        }
 
-            // write the response we have constructed into the socket
-            ByteBuffer buffer = httpServerResponse.flush();
-            httpRequestMessage.getHttpConnection().write(buffer);
+        // write the response we have constructed into the socket
+        ByteBuffer buffer = null;
+        try
+        {
+            buffer = httpServerResponse.flush();
         }
-        catch(Exception e)
+        catch (Exception e)
         {
-            logger_.warn(LogUtil.throwableToString(e));
+            throw new RuntimeException(e);
         }
+        request_.getHttpConnection().write(buffer);
     }
 
-    private void doGet(HttpRequest httpRequest, HttpWriteResponse httpResponse)
+    private void doGet(org.apache.cassandra.net.http.HttpRequest httpRequest, HttpWriteResponse httpResponse)
     {
         boolean fServeSummary = true;
         HTMLFormatter formatter = new HTMLFormatter();
@@ -180,7 +180,7 @@
      * As a result of the POST query, we currently only send back some
      * javascript that updates the data in some place on the browser.
     */
-    private void doPost(HttpRequest httpRequest, HttpWriteResponse httpResponse)
+    private void doPost(org.apache.cassandra.net.http.HttpRequest httpRequest, HttpWriteResponse httpResponse)
     {
         String query = httpRequest.getQuery();
 
@@ -484,7 +484,7 @@
     /*
      * Handle the query of some data from the client.
      */
-    private String handleQuery(HttpRequest httpRequest)
+    private String handleQuery(org.apache.cassandra.net.http.HttpRequest httpRequest)
     {
     	boolean fQuerySuccess = false;
     	String sRetVal = "";
@@ -538,7 +538,7 @@
     /*
      * Handle the query of some data from the client.
      */
-    private String handleInsert(HttpRequest httpRequest)
+    private String handleInsert(org.apache.cassandra.net.http.HttpRequest httpRequest)
     {
     	boolean fInsertSuccess = false;
     	String sRetVal = "";
@@ -580,7 +580,7 @@
     /*
      * Handle the script to be run on the server.
      */
-    private String handleScript(HttpRequest httpRequest)
+    private String handleScript(org.apache.cassandra.net.http.HttpRequest httpRequest)
     {
     	boolean fQuerySuccess = false;
     	String sRetVal = "";

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=771400&r1=771399&r2=771400&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Mon May  4 19:02:01 2009
@@ -21,7 +21,6 @@
 import java.io.File;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
-import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -32,7 +31,6 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -70,8 +68,6 @@
 import org.apache.cassandra.dht.BootstrapInitiateMessage;
 import org.apache.cassandra.dht.BootstrapMetadataVerbHandler;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.OrderPreservingPartitioner;
-import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.IEndPointStateChangeSubscriber;
@@ -296,7 +292,6 @@
         MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bootStrapInitiateVerbHandler_, new Table.BootStrapInitiateVerbHandler());
         MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bootStrapInitiateDoneVerbHandler_, new StorageService.BootstrapInitiateDoneVerbHandler());
         MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bootStrapTerminateVerbHandler_, new StreamManager.BootstrapTerminateVerbHandler());
-        MessagingService.getMessagingInstance().registerVerbHandlers(HttpConnection.httpRequestVerbHandler_, new HttpRequestVerbHandler(this) );
         MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.dataFileVerbHandler_, new DataFileVerbHandler() );
         MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.mbrshipCleanerVerbHandler_, new MembershipCleanerVerbHandler() );
         MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bsMetadataVerbHandler_, new BootstrapMetadataVerbHandler() );