You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sirona.apache.org by ol...@apache.org on 2014/03/12 02:48:24 UTC

svn commit: r1576556 - in /incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube: DisruptorPathTrackingDataStore.java HttpClientCube.java

Author: olamy
Date: Wed Mar 12 01:48:23 2014
New Revision: 1576556

URL: http://svn.apache.org/r1576556
Log:
more configurable better connection management for http client

Modified:
    incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/DisruptorPathTrackingDataStore.java
    incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/HttpClientCube.java

Modified: incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/DisruptorPathTrackingDataStore.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/DisruptorPathTrackingDataStore.java?rev=1576556&r1=1576555&r2=1576556&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/DisruptorPathTrackingDataStore.java (original)
+++ incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/DisruptorPathTrackingDataStore.java Wed Mar 12 01:48:23 2014
@@ -17,14 +17,16 @@
 
 package org.apache.sirona.cube;
 
+import com.lmax.disruptor.BusySpinWaitStrategy;
 import com.lmax.disruptor.EventFactory;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.EventTranslator;
 import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.SleepingWaitStrategy;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
 import org.apache.sirona.configuration.Configuration;
+import org.apache.sirona.configuration.ioc.AutoSet;
+import org.apache.sirona.configuration.ioc.Created;
 import org.apache.sirona.configuration.ioc.Destroying;
 import org.apache.sirona.configuration.ioc.IoCs;
 import org.apache.sirona.store.tracking.BatchPathTrackingDataStore;
@@ -40,47 +42,38 @@ import java.util.concurrent.Executors;
 /**
  *
  */
+@AutoSet
 public class DisruptorPathTrackingDataStore
     extends BatchPathTrackingDataStore
     implements CollectorPathTrackingDataStore
 {
     private static final Cube CUBE = IoCs.findOrCreateInstance( CubeBuilder.class ).build();
 
-
-    private static final boolean USE_EXECUTORS = Boolean.parseBoolean(
-        Configuration.getProperty( Configuration.CONFIG_PROPERTY_PREFIX + "pathtracking.post.useexecutors", "false" ) );
-
     private static boolean USE_SINGLE_STORE = Boolean.parseBoolean(
         Configuration.getProperty( Configuration.CONFIG_PROPERTY_PREFIX + "pathtracking.singlestore", "false" ) );
 
 
-    protected static ExecutorService executorService;
+    private RingBuffer<PathTrackingEntry> ringBuffer;
 
-    private static RingBuffer<PathTrackingEntry> RINGBUFFER;
-
-    static
-    {
+    private Disruptor<PathTrackingEntry> disruptor;
 
-        if ( USE_EXECUTORS )
-        {
-            int threadsNumber =
-                Configuration.getInteger( Configuration.CONFIG_PROPERTY_PREFIX + "pathtracking.post.executors", 5 );
-            executorService = Executors.newFixedThreadPool( threadsNumber );
-
-        }
+    private int ringBufferSize = 4096;
 
+    @Created
+    public void initialize()
+    {
         ExecutorService exec = Executors.newCachedThreadPool();
 
         // FIXME make configurable: ring buffer size and WaitStrategy
 
-        Disruptor<PathTrackingEntry> disruptor = new Disruptor<PathTrackingEntry>( new EventFactory<PathTrackingEntry>()
+        disruptor = new Disruptor<PathTrackingEntry>( new EventFactory<PathTrackingEntry>()
         {
             @Override
             public PathTrackingEntry newInstance()
             {
                 return new PathTrackingEntry();
             }
-        }, 2048, exec, ProducerType.SINGLE, new SleepingWaitStrategy()
+        }, ringBufferSize, exec, ProducerType.SINGLE, new BusySpinWaitStrategy()
         );
 
         final EventHandler<PathTrackingEntry> handler = new EventHandler<PathTrackingEntry>()
@@ -95,7 +88,7 @@ public class DisruptorPathTrackingDataSt
 
         disruptor.handleEventsWith( handler );
 
-        RINGBUFFER = disruptor.start();
+        ringBuffer = disruptor.start();
 
     }
 
@@ -103,7 +96,7 @@ public class DisruptorPathTrackingDataSt
     public void store( final PathTrackingEntry pathTrackingEntry )
     {
 
-        RINGBUFFER.publishEvent( new EventTranslator<PathTrackingEntry>()
+        ringBuffer.publishEvent( new EventTranslator<PathTrackingEntry>()
         {
             @Override
             public void translateTo( PathTrackingEntry event, long sequence )
@@ -139,10 +132,21 @@ public class DisruptorPathTrackingDataSt
         }
     }
 
+    public RingBuffer<PathTrackingEntry> getRingBuffer()
+    {
+        return ringBuffer;
+    }
+
+    public void setRingBuffer( RingBuffer<PathTrackingEntry> ringBuffer )
+    {
+        this.ringBuffer = ringBuffer;
+    }
+
     @Destroying
     public void destroy()
     {
-        executorService.shutdownNow();
+        // FIXME timeout??
+        disruptor.shutdown();
     }
 
 

Modified: incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/HttpClientCube.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/HttpClientCube.java?rev=1576556&r1=1576555&r2=1576556&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/HttpClientCube.java (original)
+++ incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/HttpClientCube.java Wed Mar 12 01:48:23 2014
@@ -16,12 +16,14 @@
  */
 package org.apache.sirona.cube;
 
+import org.apache.http.HttpResponse;
+import org.apache.http.client.ClientProtocolException;
 import org.apache.http.client.HttpClient;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.methods.HttpPost;
-import org.apache.http.conn.ssl.SSLContexts;
 import org.apache.http.entity.ByteArrayEntity;
 import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.impl.client.HttpClients;
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 
 import java.io.IOException;
@@ -37,10 +39,12 @@ public class HttpClientCube
     extends Cube
 {
 
-    private static final Logger LOGGER = Logger.getLogger(HttpClientCube.class.getName());
+    private static final Logger LOGGER = Logger.getLogger( HttpClientCube.class.getName() );
 
     private HttpClient httpclient;
 
+    private RequestConfig requestConfig;
+
     public HttpClientCube( CubeBuilder cubeBuilder )
     {
         super( cubeBuilder );
@@ -50,10 +54,22 @@ public class HttpClientCube
 
             PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
 
-            builder.setConnectionManager( connectionManager );
+            // FIXME configurable
+            connectionManager.setMaxTotal( 10 );
+
+            connectionManager.setDefaultMaxPerRoute( 10 );
+
+            builder = builder.setConnectionManager( connectionManager );
 
             httpclient = builder.build();
 
+            // FIXME configurable
+
+            requestConfig = RequestConfig.custom() //
+                .setSocketTimeout( 5000 ) //
+                .setConnectTimeout( 5000 ) //
+                .setConnectionRequestTimeout( 5000 ) //
+                .build();
 
         }
         catch ( Exception e )
@@ -69,30 +85,49 @@ public class HttpClientCube
 
         try
         {
-            final URI uri = new URI(getConfig().getCollector());
+            final URI uri = new URI( getConfig().getCollector() );
 
             HttpPost httpPost = new HttpPost( uri );
             httpPost.setEntity( new ByteArrayEntity( bytes ) );
             httpPost.setHeader( CONTENT_TYPE, APPLICATION_JAVA_OBJECT );
             httpPost.setHeader( X_SIRONA_CLASSNAME, className );
 
-            httpclient.execute( httpPost );
+            httpPost.setConfig( requestConfig );
+
+            httpclient.execute( httpPost, new ResponseHandler<HttpResponse>()
+            {
+                public HttpResponse handleResponse( HttpResponse httpResponse )
+                    throws ClientProtocolException, IOException
+                {
+                    int status = httpResponse.getStatusLine().getStatusCode();
+                    if ( status != 200 )
+                    {
+                        LOGGER.warning( "Pushed data but response code is: " + status + //
+                                            ", reason:" + httpResponse.getStatusLine().getReasonPhrase() );
+                    }
+                    return httpResponse;
+                }
+            } );
+
         }
         catch ( URISyntaxException e )
         {
-            if (LOGGER.isLoggable( Level.FINE ) )
+            if ( LOGGER.isLoggable( Level.FINE ) )
             {
-                LOGGER.log(Level.FINE, "Can't post data to collector:" + e.getMessage(),e);
-            } else
+                LOGGER.log( Level.FINE, "Can't post data to collector:" + e.getMessage(), e );
+            }
+            else
             {
                 LOGGER.log( Level.WARNING, "Can't post data to collector: " + e.getMessage() );
             }
-        } catch ( IOException e)
+        }
+        catch ( IOException e )
         {
-            if (LOGGER.isLoggable( Level.FINE ) )
+            if ( LOGGER.isLoggable( Level.FINE ) )
             {
-                LOGGER.log(Level.FINE, "Can't post data to collector:" + e.getMessage(),e);
-            } else
+                LOGGER.log( Level.FINE, "Can't post data to collector:" + e.getMessage(), e );
+            }
+            else
             {
                 LOGGER.log( Level.WARNING, "Can't post data to collector: " + e.getMessage() );
             }