You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sirona.apache.org by ol...@apache.org on 2014/03/12 02:48:24 UTC
svn commit: r1576556 - in
/incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube:
DisruptorPathTrackingDataStore.java HttpClientCube.java
Author: olamy
Date: Wed Mar 12 01:48:23 2014
New Revision: 1576556
URL: http://svn.apache.org/r1576556
Log:
more configurable better connection management for http client
Modified:
incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/DisruptorPathTrackingDataStore.java
incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/HttpClientCube.java
Modified: incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/DisruptorPathTrackingDataStore.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/DisruptorPathTrackingDataStore.java?rev=1576556&r1=1576555&r2=1576556&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/DisruptorPathTrackingDataStore.java (original)
+++ incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/DisruptorPathTrackingDataStore.java Wed Mar 12 01:48:23 2014
@@ -17,14 +17,16 @@
package org.apache.sirona.cube;
+import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.apache.sirona.configuration.Configuration;
+import org.apache.sirona.configuration.ioc.AutoSet;
+import org.apache.sirona.configuration.ioc.Created;
import org.apache.sirona.configuration.ioc.Destroying;
import org.apache.sirona.configuration.ioc.IoCs;
import org.apache.sirona.store.tracking.BatchPathTrackingDataStore;
@@ -40,47 +42,38 @@ import java.util.concurrent.Executors;
/**
*
*/
+@AutoSet
public class DisruptorPathTrackingDataStore
extends BatchPathTrackingDataStore
implements CollectorPathTrackingDataStore
{
private static final Cube CUBE = IoCs.findOrCreateInstance( CubeBuilder.class ).build();
-
- private static final boolean USE_EXECUTORS = Boolean.parseBoolean(
- Configuration.getProperty( Configuration.CONFIG_PROPERTY_PREFIX + "pathtracking.post.useexecutors", "false" ) );
-
private static boolean USE_SINGLE_STORE = Boolean.parseBoolean(
Configuration.getProperty( Configuration.CONFIG_PROPERTY_PREFIX + "pathtracking.singlestore", "false" ) );
- protected static ExecutorService executorService;
+ private RingBuffer<PathTrackingEntry> ringBuffer;
- private static RingBuffer<PathTrackingEntry> RINGBUFFER;
-
- static
- {
+ private Disruptor<PathTrackingEntry> disruptor;
- if ( USE_EXECUTORS )
- {
- int threadsNumber =
- Configuration.getInteger( Configuration.CONFIG_PROPERTY_PREFIX + "pathtracking.post.executors", 5 );
- executorService = Executors.newFixedThreadPool( threadsNumber );
-
- }
+ private int ringBufferSize = 4096;
+ @Created
+ public void initialize()
+ {
ExecutorService exec = Executors.newCachedThreadPool();
// FIXME make configurable: ring buffer size and WaitStrategy
- Disruptor<PathTrackingEntry> disruptor = new Disruptor<PathTrackingEntry>( new EventFactory<PathTrackingEntry>()
+ disruptor = new Disruptor<PathTrackingEntry>( new EventFactory<PathTrackingEntry>()
{
@Override
public PathTrackingEntry newInstance()
{
return new PathTrackingEntry();
}
- }, 2048, exec, ProducerType.SINGLE, new SleepingWaitStrategy()
+ }, ringBufferSize, exec, ProducerType.SINGLE, new BusySpinWaitStrategy()
);
final EventHandler<PathTrackingEntry> handler = new EventHandler<PathTrackingEntry>()
@@ -95,7 +88,7 @@ public class DisruptorPathTrackingDataSt
disruptor.handleEventsWith( handler );
- RINGBUFFER = disruptor.start();
+ ringBuffer = disruptor.start();
}
@@ -103,7 +96,7 @@ public class DisruptorPathTrackingDataSt
public void store( final PathTrackingEntry pathTrackingEntry )
{
- RINGBUFFER.publishEvent( new EventTranslator<PathTrackingEntry>()
+ ringBuffer.publishEvent( new EventTranslator<PathTrackingEntry>()
{
@Override
public void translateTo( PathTrackingEntry event, long sequence )
@@ -139,10 +132,21 @@ public class DisruptorPathTrackingDataSt
}
}
+ public RingBuffer<PathTrackingEntry> getRingBuffer()
+ {
+ return ringBuffer;
+ }
+
+ public void setRingBuffer( RingBuffer<PathTrackingEntry> ringBuffer )
+ {
+ this.ringBuffer = ringBuffer;
+ }
+
@Destroying
public void destroy()
{
- executorService.shutdownNow();
+ // FIXME timeout??
+ disruptor.shutdown();
}
Modified: incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/HttpClientCube.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/HttpClientCube.java?rev=1576556&r1=1576555&r2=1576556&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/HttpClientCube.java (original)
+++ incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/HttpClientCube.java Wed Mar 12 01:48:23 2014
@@ -16,12 +16,14 @@
*/
package org.apache.sirona.cube;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpClient;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPost;
-import org.apache.http.conn.ssl.SSLContexts;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import java.io.IOException;
@@ -37,10 +39,12 @@ public class HttpClientCube
extends Cube
{
- private static final Logger LOGGER = Logger.getLogger(HttpClientCube.class.getName());
+ private static final Logger LOGGER = Logger.getLogger( HttpClientCube.class.getName() );
private HttpClient httpclient;
+ private RequestConfig requestConfig;
+
public HttpClientCube( CubeBuilder cubeBuilder )
{
super( cubeBuilder );
@@ -50,10 +54,22 @@ public class HttpClientCube
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
- builder.setConnectionManager( connectionManager );
+ // FIXME configurable
+ connectionManager.setMaxTotal( 10 );
+
+ connectionManager.setDefaultMaxPerRoute( 10 );
+
+ builder = builder.setConnectionManager( connectionManager );
httpclient = builder.build();
+ // FIXME configurable
+
+ requestConfig = RequestConfig.custom() //
+ .setSocketTimeout( 5000 ) //
+ .setConnectTimeout( 5000 ) //
+ .setConnectionRequestTimeout( 5000 ) //
+ .build();
}
catch ( Exception e )
@@ -69,30 +85,49 @@ public class HttpClientCube
try
{
- final URI uri = new URI(getConfig().getCollector());
+ final URI uri = new URI( getConfig().getCollector() );
HttpPost httpPost = new HttpPost( uri );
httpPost.setEntity( new ByteArrayEntity( bytes ) );
httpPost.setHeader( CONTENT_TYPE, APPLICATION_JAVA_OBJECT );
httpPost.setHeader( X_SIRONA_CLASSNAME, className );
- httpclient.execute( httpPost );
+ httpPost.setConfig( requestConfig );
+
+ httpclient.execute( httpPost, new ResponseHandler<HttpResponse>()
+ {
+ public HttpResponse handleResponse( HttpResponse httpResponse )
+ throws ClientProtocolException, IOException
+ {
+ int status = httpResponse.getStatusLine().getStatusCode();
+ if ( status != 200 )
+ {
+ LOGGER.warning( "Pushed data but response code is: " + status + //
+ ", reason:" + httpResponse.getStatusLine().getReasonPhrase() );
+ }
+ return httpResponse;
+ }
+ } );
+
}
catch ( URISyntaxException e )
{
- if (LOGGER.isLoggable( Level.FINE ) )
+ if ( LOGGER.isLoggable( Level.FINE ) )
{
- LOGGER.log(Level.FINE, "Can't post data to collector:" + e.getMessage(),e);
- } else
+ LOGGER.log( Level.FINE, "Can't post data to collector:" + e.getMessage(), e );
+ }
+ else
{
LOGGER.log( Level.WARNING, "Can't post data to collector: " + e.getMessage() );
}
- } catch ( IOException e)
+ }
+ catch ( IOException e )
{
- if (LOGGER.isLoggable( Level.FINE ) )
+ if ( LOGGER.isLoggable( Level.FINE ) )
{
- LOGGER.log(Level.FINE, "Can't post data to collector:" + e.getMessage(),e);
- } else
+ LOGGER.log( Level.FINE, "Can't post data to collector:" + e.getMessage(), e );
+ }
+ else
{
LOGGER.log( Level.WARNING, "Can't post data to collector: " + e.getMessage() );
}