You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sirona.apache.org by ol...@apache.org on 2014/03/28 07:08:16 UTC
svn commit: r1582621 - in /incubator/sirona/trunk:
agent/store/cube/src/main/java/org/apache/sirona/cube/
server/store/cassandra/src/main/java/org/apache/sirona/cassandra/collector/pathtracking/
Author: olamy
Date: Fri Mar 28 06:08:16 2014
New Revision: 1582621
URL: http://svn.apache.org/r1582621
Log:
possible to record pathtracking event as async in cassandra, add multiple consumers for disruptor
Modified:
incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/DisruptorPathTrackingDataStore.java
incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/collector/pathtracking/CassandraPathTrackingDataStore.java
Modified: incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/DisruptorPathTrackingDataStore.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/DisruptorPathTrackingDataStore.java?rev=1582621&r1=1582620&r2=1582621&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/DisruptorPathTrackingDataStore.java (original)
+++ incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/DisruptorPathTrackingDataStore.java Fri Mar 28 06:08:16 2014
@@ -76,19 +76,40 @@ public class DisruptorPathTrackingDataSt
}, ringBufferSize, exec, ProducerType.SINGLE, new BusySpinWaitStrategy()
);
- final EventHandler<PathTrackingEntry> handler = new EventHandler<PathTrackingEntry>()
+ // FIXME configurable
+ int numberOfConsumers = 4;
+
+ for ( int i = 0; i < numberOfConsumers; i++ )
+ {
+ System.out.println( "create PathTrackingEntryEventHandler" );
+ disruptor.handleEventsWith( new PathTrackingEntryEventHandler( i, numberOfConsumers ) );
+ }
+ ringBuffer = disruptor.start();
+
+ }
+
+ private static class PathTrackingEntryEventHandler
+ implements EventHandler<PathTrackingEntry>
+ {
+
+ private final long ordinal;
+
+ private final long numberOfConsumers;
+
+ public PathTrackingEntryEventHandler( final long ordinal, final long numberOfConsumers )
+ {
+ this.ordinal = ordinal;
+ this.numberOfConsumers = numberOfConsumers;
+ }
+
+ public void onEvent( final PathTrackingEntry entry, final long sequence, final boolean endOfBatch )
+ throws Exception
{
- // event will eventually be recycled by the Disruptor after it wraps
- public void onEvent( final PathTrackingEntry entry, final long sequence, final boolean endOfBatch )
- throws Exception
+ if ( ( sequence % numberOfConsumers ) == ordinal )
{
CUBE.doPostBytes( SerializeUtils.serialize( entry ), PathTrackingEntry.class.getName() );
}
- };
-
- disruptor.handleEventsWith( handler );
-
- ringBuffer = disruptor.start();
+ }
}
Modified: incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/collector/pathtracking/CassandraPathTrackingDataStore.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/collector/pathtracking/CassandraPathTrackingDataStore.java?rev=1582621&r1=1582620&r2=1582621&view=diff
==============================================================================
--- incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/collector/pathtracking/CassandraPathTrackingDataStore.java (original)
+++ incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/collector/pathtracking/CassandraPathTrackingDataStore.java Fri Mar 28 06:08:16 2014
@@ -29,6 +29,7 @@ import me.prettyprint.hector.api.mutatio
import me.prettyprint.hector.api.query.QueryResult;
import org.apache.sirona.cassandra.DynamicDelegatedSerializer;
import org.apache.sirona.cassandra.collector.CassandraSirona;
+import org.apache.sirona.configuration.Configuration;
import org.apache.sirona.configuration.ioc.IoCs;
import org.apache.sirona.store.tracking.BatchPathTrackingDataStore;
import org.apache.sirona.store.tracking.CollectorPathTrackingDataStore;
@@ -46,6 +47,8 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import static org.apache.sirona.cassandra.collector.CassandraSirona.*;
@@ -85,6 +88,26 @@ public class CassandraPathTrackingDataSt
};
+ private static final boolean USE_EXECUTORS = Boolean.parseBoolean(
+ Configuration.getProperty( Configuration.CONFIG_PROPERTY_PREFIX + "pathtracking.cassandra.useexecutors",
+ "false" )
+ );
+
+
+ protected static ExecutorService EXECUTORSERVICE;
+
+ static
+ {
+
+ if ( USE_EXECUTORS )
+ {
+ int threadsNumber =
+ Configuration.getInteger( Configuration.CONFIG_PROPERTY_PREFIX + "pathtracking.cassandra.executors",
+ 5 );
+ EXECUTORSERVICE = Executors.newFixedThreadPool( threadsNumber );
+ }
+ }
+
public CassandraPathTrackingDataStore()
{
this.cassandra = IoCs.findOrCreateInstance( CassandraSirona.class );
@@ -94,9 +117,26 @@ public class CassandraPathTrackingDataSt
}
@Override
- public void store( PathTrackingEntry pathTrackingEntry )
+ public void store( final PathTrackingEntry pathTrackingEntry )
{
- store( Collections.singletonList( pathTrackingEntry ) );
+ Runnable runnable = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ store( Collections.singletonList( pathTrackingEntry ) );
+ }
+ };
+
+ if ( USE_EXECUTORS )
+ {
+ EXECUTORSERVICE.submit( runnable );
+ }
+ else
+ {
+ runnable.run();
+ }
+
}
@Override