You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ay...@apache.org on 2014/03/25 15:21:37 UTC

git commit: CAMEL-7316: refactor camel-ws using the refactored camel-ahc

Repository: camel
Updated Branches:
  refs/heads/master 7b67a04ed -> 6208c5afb


CAMEL-7316: refactor camel-ws using the refactored camel-ahc


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6208c5af
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6208c5af
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6208c5af

Branch: refs/heads/master
Commit: 6208c5afb2809987a84dafe654314783fac81250
Parents: 7b67a04
Author: Akitoshi Yoshida <ay...@apache.org>
Authored: Tue Mar 25 15:20:00 2014 +0100
Committer: Akitoshi Yoshida <ay...@apache.org>
Committed: Tue Mar 25 15:20:00 2014 +0100

----------------------------------------------------------------------
 components/camel-ws/pom.xml                     |  16 +--
 .../apache/camel/component/ws/WsComponent.java  | 128 +++---------------
 .../apache/camel/component/ws/WsEndpoint.java   | 130 ++++---------------
 3 files changed, 42 insertions(+), 232 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/6208c5af/components/camel-ws/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-ws/pom.xml b/components/camel-ws/pom.xml
index 469977f..a5b7b41 100644
--- a/components/camel-ws/pom.xml
+++ b/components/camel-ws/pom.xml
@@ -41,20 +41,8 @@
       <artifactId>camel-core</artifactId>
     </dependency>
     <dependency>
-      <groupId>com.ning</groupId>
-      <artifactId>async-http-client</artifactId>
-      <version>${ahc-version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>io.netty</groupId>
-          <artifactId>netty</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty</artifactId>
-      <version>${netty3-version}</version>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-ahc</artifactId>
     </dependency>
     <dependency>
       <groupId>org.glassfish.grizzly</groupId>

http://git-wip-us.apache.org/repos/asf/camel/blob/6208c5af/components/camel-ws/src/main/java/org/apache/camel/component/ws/WsComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-ws/src/main/java/org/apache/camel/component/ws/WsComponent.java b/components/camel-ws/src/main/java/org/apache/camel/component/ws/WsComponent.java
index a826aa3..e6832d7 100644
--- a/components/camel-ws/src/main/java/org/apache/camel/component/ws/WsComponent.java
+++ b/components/camel-ws/src/main/java/org/apache/camel/component/ws/WsComponent.java
@@ -17,127 +17,29 @@
 package org.apache.camel.component.ws;
 
 import java.net.URI;
-import java.util.Map;
 
-import com.ning.http.client.AsyncHttpClientConfig;
-import com.ning.http.client.Realm;
-
-import org.apache.camel.Endpoint;
-import org.apache.camel.impl.DefaultComponent;
-import org.apache.camel.util.IntrospectionSupport;
-import org.apache.camel.util.URISupport;
-import org.apache.camel.util.UnsafeUriCharactersEncoder;
-import org.apache.camel.util.jsse.SSLContextParameters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.camel.component.ahc.AhcComponent;
+import org.apache.camel.component.ahc.AhcEndpoint;
 
 /**
- *  Defines the <a href="http://camel.apache.org/ahc.html">Async HTTP Client Component</a>
+ *  Defines the <a href="http://camel.apache.org/ws.html">WebSocket Client Component</a>
  */
-public class WsComponent extends DefaultComponent {
-    
-    private static final Logger LOG = LoggerFactory.getLogger(WsComponent.class);
+public class WsComponent extends AhcComponent {
     
-    private static final String CLIENT_CONFIG_PREFIX = "clientConfig.";
-    private static final String CLIENT_REALM_CONFIG_PREFIX = "clientConfig.realm.";
-
-    private AsyncHttpClientConfig clientConfig;
-    private SSLContextParameters sslContextParameters;
-
+    /* (non-Javadoc)
+     * @see org.apache.camel.component.ahc.AhcComponent#createAddressUri(java.lang.String, java.lang.String)
+     */
     @Override
-    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
-        String addressUri = uri;
-
-        // Do not set the URI because we still have all of the Camel internal
-        // parameters in the URI at this point.
-        WsEndpoint endpoint = new WsEndpoint(uri, this);
-        endpoint.setClientConfig(getClientConfig());
-        endpoint.setSslContextParameters(getSslContextParameters());
-        
-        setProperties(endpoint, parameters);
-
-        if (IntrospectionSupport.hasProperties(parameters, CLIENT_CONFIG_PREFIX)) {
-            AsyncHttpClientConfig.Builder builder = endpoint.getClientConfig() == null 
-                    ? new AsyncHttpClientConfig.Builder() : WsComponent.cloneConfig(endpoint.getClientConfig());
-            
-            if (endpoint.getClientConfig() != null) {
-                LOG.warn("The user explicitly set an AsyncHttpClientConfig instance on the component or "
-                         + "endpoint, but this endpoint URI contains client configuration parameters.  "
-                         + "Are you sure that this is what was intended?  The URI parameters will be applied"
-                         + " to a clone of the supplied AsyncHttpClientConfig in order to prevent unintended modification"
-                         + " of the explicitly configured AsyncHttpClientConfig.  That is, the URI parameters override the"
-                         + " settings on the explicitly configured AsyncHttpClientConfig for this endpoint.");
-            }
-
-            // special for realm builder
-            Realm.RealmBuilder realmBuilder = null;
-            if (IntrospectionSupport.hasProperties(parameters, CLIENT_REALM_CONFIG_PREFIX)) {
-                realmBuilder = new Realm.RealmBuilder();
-
-                // set and validate additional parameters on client config
-                Map<String, Object> realmParams = IntrospectionSupport.extractProperties(parameters, CLIENT_REALM_CONFIG_PREFIX);
-                setProperties(realmBuilder, realmParams);
-                validateParameters(uri, realmParams, null);
-            }
-
-            // set and validate additional parameters on client config
-            Map<String, Object> clientParams = IntrospectionSupport.extractProperties(parameters, CLIENT_CONFIG_PREFIX);
-            setProperties(builder, clientParams);
-            validateParameters(uri, clientParams, null);
-
-            if (realmBuilder != null) {
-                builder.setRealm(realmBuilder.build());
-            }
-            endpoint.setClientConfig(builder.build());
-        }
-
-        SSLContextParameters sslparams = resolveAndRemoveReferenceParameter(parameters, "sslContextParameters", SSLContextParameters.class);
-        
-        // prefer to use endpoint configured over component configured
-        if (sslparams == null) {
-            // fallback to component configured
-            sslparams = getSslContextParameters();
-        }
-        if (sslparams != null) {
-            endpoint.setSslContextParameters(sslparams);
-        }
-
-        // restructure uri to be based on the parameters left as we dont want to include the Camel internal options
-        addressUri = UnsafeUriCharactersEncoder.encode(addressUri);
-        URI wsuri = URISupport.createRemainingURI(new URI(addressUri), parameters);
-        endpoint.setWsUri(wsuri);
-        
-        return endpoint;
+    protected String createAddressUri(String uri, String remaining) {
+        return uri;
     }
 
-    public AsyncHttpClientConfig getClientConfig() {
-        return clientConfig;
-    }
-
-    public void setClientConfig(AsyncHttpClientConfig clientConfig) {
-        this.clientConfig = clientConfig;
-    }
-
-    public SSLContextParameters getSslContextParameters() {
-        return sslContextParameters;
-    }
-
-    public void setSslContextParameters(SSLContextParameters sslContextParameters) {
-        this.sslContextParameters = sslContextParameters;
-    }
-    
-    /**
-     * Creates a new client configuration builder using {@code clientConfig} as a template for
-     * the builder.
-     *
-     * @param clientConfig the instance to serve as a template for the builder
-     *
-     * @return a builder configured with the same options as the supplied config
+    /* (non-Javadoc)
+     * @see org.apache.camel.component.ahc.AhcComponent#createAhcEndpoint(java.lang.String, org.apache.camel.component.ahc.AhcComponent, java.net.URI)
      */
-    static AsyncHttpClientConfig.Builder cloneConfig(AsyncHttpClientConfig clientConfig) {
-
-        AsyncHttpClientConfig.Builder builder = new AsyncHttpClientConfig.Builder(clientConfig);
-
-        return builder;
+    @Override
+    protected AhcEndpoint createAhcEndpoint(String endpointUri, AhcComponent component, URI httpUri) {
+        return new WsEndpoint(endpointUri, (WsComponent)component);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/6208c5af/components/camel-ws/src/main/java/org/apache/camel/component/ws/WsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-ws/src/main/java/org/apache/camel/component/ws/WsEndpoint.java b/components/camel-ws/src/main/java/org/apache/camel/component/ws/WsEndpoint.java
index 67ebe2f..a4988d4 100644
--- a/components/camel-ws/src/main/java/org/apache/camel/component/ws/WsEndpoint.java
+++ b/components/camel-ws/src/main/java/org/apache/camel/component/ws/WsEndpoint.java
@@ -20,13 +20,10 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.CharArrayReader;
 import java.io.IOException;
-import java.net.URI;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
-import javax.net.ssl.SSLContext;
-
 import com.ning.http.client.AsyncHttpClient;
 import com.ning.http.client.AsyncHttpClientConfig;
 import com.ning.http.client.AsyncHttpProvider;
@@ -39,28 +36,22 @@ import com.ning.http.client.websocket.WebSocketUpgradeHandler;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.impl.DefaultEndpoint;
-import org.apache.camel.util.jsse.SSLContextParameters;
+import org.apache.camel.component.ahc.AhcEndpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  *
  */
-public class WsEndpoint extends DefaultEndpoint {
+public class WsEndpoint extends AhcEndpoint {
     private static final transient Logger LOG = LoggerFactory.getLogger(WsEndpoint.class);
 
+    // for using websocket streaming/fragments
     private static final boolean GRIZZLY_AVAILABLE = 
         probeClass("com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider");
     
-    private AsyncHttpClient client;
-    private AsyncHttpClientConfig clientConfig;
     private WebSocket websocket;
     private Set<WsConsumer> consumers;
-    private URI wsUri;
-    private boolean throwExceptionOnFailure = true;
-    private boolean transferException;
-    private SSLContextParameters sslContextParameters;
     private boolean useStreaming;
 
     private static boolean probeClass(String name) {
@@ -73,7 +64,7 @@ public class WsEndpoint extends DefaultEndpoint {
     }
     
     public WsEndpoint(String endpointUri, WsComponent component) {
-        super(endpointUri, component);
+        super(endpointUri, component, null);
         this.consumers = new HashSet<WsConsumer>();
     }
 
@@ -92,20 +83,13 @@ public class WsEndpoint extends DefaultEndpoint {
         return new WsConsumer(this, processor);
     }
 
-
-    @Override
-    public boolean isSingleton() {
-        return true;
-    }
-
     WebSocket getWebSocket() {
         synchronized (this) {
             if (websocket == null) {
                 try { 
                     connect();
                 } catch (Exception e) {
-                    // TODO add the throw exception in the method 
-                    e.printStackTrace();
+                    LOG.error("Failed to connect", e);
                 }
             }
         }
@@ -116,46 +100,6 @@ public class WsEndpoint extends DefaultEndpoint {
         this.websocket = websocket;
     }
 
-    public AsyncHttpClientConfig getClientConfig() {
-        return clientConfig;
-    }
-
-    public void setClientConfig(AsyncHttpClientConfig clientConfig) {
-        this.clientConfig = clientConfig;
-    }
-
-    public boolean isThrowExceptionOnFailure() {
-        return throwExceptionOnFailure;
-    }
-
-    public void setThrowExceptionOnFailure(boolean throwExceptionOnFailure) {
-        this.throwExceptionOnFailure = throwExceptionOnFailure;
-    }
-
-    public boolean isTransferException() {
-        return transferException;
-    }
-
-    public void setTransferException(boolean transferException) {
-        this.transferException = transferException;
-    }
-    
-    public SSLContextParameters getSslContextParameters() {
-        return sslContextParameters;
-    }
-
-    public void setSslContextParameters(SSLContextParameters sslContextParameters) {
-        this.sslContextParameters = sslContextParameters;
-    }
-
-    public URI getWsUri() {
-        return wsUri;
-    }
-
-    public void setWsUri(URI wsUri) {
-        this.wsUri = wsUri;
-    }
-
     /**
      * @return the useStreaming
      */
@@ -170,60 +114,36 @@ public class WsEndpoint extends DefaultEndpoint {
         this.useStreaming = useStreaming;
     }
 
+    /* (non-Javadoc)
+     * @see org.apache.camel.component.ahc.AhcEndpoint#createClient(com.ning.http.client.AsyncHttpClientConfig)
+     */
+    @Override
+    protected AsyncHttpClient createClient(AsyncHttpClientConfig config) {
+        AsyncHttpClient client;
+        if (config == null) {
+            config = new AsyncHttpClientConfig.Builder().build();
+        }
+        AsyncHttpProvider ahp = getAsyncHttpProvider(config);
+        if (ahp == null) {
+            client = new AsyncHttpClient(config);
+        } else {
+            client = new AsyncHttpClient(ahp, config);
+        }
+        return client; 
+    }
+
     public void connect() throws InterruptedException, ExecutionException, IOException {
-        websocket = client.prepareGet(wsUri.toASCIIString()).execute(
+        websocket = getClient().prepareGet(getHttpUri().toASCIIString()).execute(
             new WebSocketUpgradeHandler.Builder()
                 .addWebSocketListener(new WsListener()).build()).get();
     }
     
     @Override
-    protected void doStart() throws Exception {
-        super.doStart();
-        if (client == null) {
-            
-            AsyncHttpClientConfig config = null;
-            
-            if (clientConfig != null) {
-                AsyncHttpClientConfig.Builder builder = WsComponent.cloneConfig(clientConfig);
-                
-                if (sslContextParameters != null) {
-                    SSLContext ssl = sslContextParameters.createSSLContext();
-                    builder.setSSLContext(ssl);
-                }
-                
-                config = builder.build();
-            } else {
-                if (sslContextParameters != null) {
-                    AsyncHttpClientConfig.Builder builder = new AsyncHttpClientConfig.Builder();
-                    SSLContext ssl = sslContextParameters.createSSLContext();
-                    builder.setSSLContext(ssl);
-                    config = builder.build();
-                }
-            }
-            
-            if (config == null) {
-                config = new AsyncHttpClientConfig.Builder().build();
-            }
-            
-            AsyncHttpProvider ahp = getAsyncHttpProvider(config);
-            if (ahp == null) {
-                client = new AsyncHttpClient(config);
-            } else {
-                client = new AsyncHttpClient(ahp, config);
-            }
-        }
-    }
-
-    @Override
     protected void doStop() throws Exception {
-        super.doStop();
         if (websocket != null && websocket.isOpen()) {
             websocket.close();
         }
-        if (client != null && !client.isClosed()) {
-            client.close();
-        }
-        client = null;
+        super.doStop();
     }
 
     void connect(WsConsumer wsConsumer) {