You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sirona.apache.org by ol...@apache.org on 2014/03/28 07:08:16 UTC

svn commit: r1582621 - in /incubator/sirona/trunk: agent/store/cube/src/main/java/org/apache/sirona/cube/ server/store/cassandra/src/main/java/org/apache/sirona/cassandra/collector/pathtracking/

Author: olamy
Date: Fri Mar 28 06:08:16 2014
New Revision: 1582621

URL: http://svn.apache.org/r1582621
Log:
possible to record pathtracking event as async in cassandra, add multiple consumers for disruptor

Modified:
    incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/DisruptorPathTrackingDataStore.java
    incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/collector/pathtracking/CassandraPathTrackingDataStore.java

Modified: incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/DisruptorPathTrackingDataStore.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/DisruptorPathTrackingDataStore.java?rev=1582621&r1=1582620&r2=1582621&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/DisruptorPathTrackingDataStore.java (original)
+++ incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/DisruptorPathTrackingDataStore.java Fri Mar 28 06:08:16 2014
@@ -76,19 +76,40 @@ public class DisruptorPathTrackingDataSt
         }, ringBufferSize, exec, ProducerType.SINGLE, new BusySpinWaitStrategy()
         );
 
-        final EventHandler<PathTrackingEntry> handler = new EventHandler<PathTrackingEntry>()
+        // FIXME configurable
+        int numberOfConsumers = 4;
+
+        for ( int i = 0; i < numberOfConsumers; i++ )
+        {
+            System.out.println( "create PathTrackingEntryEventHandler" );
+            disruptor.handleEventsWith( new PathTrackingEntryEventHandler( i, numberOfConsumers ) );
+        }
+        ringBuffer = disruptor.start();
+
+    }
+
+    private static class PathTrackingEntryEventHandler
+        implements EventHandler<PathTrackingEntry>
+    {
+
+        private final long ordinal;
+
+        private final long numberOfConsumers;
+
+        public PathTrackingEntryEventHandler( final long ordinal, final long numberOfConsumers )
+        {
+            this.ordinal = ordinal;
+            this.numberOfConsumers = numberOfConsumers;
+        }
+
+        public void onEvent( final PathTrackingEntry entry, final long sequence, final boolean endOfBatch )
+            throws Exception
         {
-            // event will eventually be recycled by the Disruptor after it wraps
-            public void onEvent( final PathTrackingEntry entry, final long sequence, final boolean endOfBatch )
-                throws Exception
+            if ( ( sequence % numberOfConsumers ) == ordinal )
             {
                 CUBE.doPostBytes( SerializeUtils.serialize( entry ), PathTrackingEntry.class.getName() );
             }
-        };
-
-        disruptor.handleEventsWith( handler );
-
-        ringBuffer = disruptor.start();
+        }
 
     }
 

Modified: incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/collector/pathtracking/CassandraPathTrackingDataStore.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/collector/pathtracking/CassandraPathTrackingDataStore.java?rev=1582621&r1=1582620&r2=1582621&view=diff
==============================================================================
--- incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/collector/pathtracking/CassandraPathTrackingDataStore.java (original)
+++ incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/collector/pathtracking/CassandraPathTrackingDataStore.java Fri Mar 28 06:08:16 2014
@@ -29,6 +29,7 @@ import me.prettyprint.hector.api.mutatio
 import me.prettyprint.hector.api.query.QueryResult;
 import org.apache.sirona.cassandra.DynamicDelegatedSerializer;
 import org.apache.sirona.cassandra.collector.CassandraSirona;
+import org.apache.sirona.configuration.Configuration;
 import org.apache.sirona.configuration.ioc.IoCs;
 import org.apache.sirona.store.tracking.BatchPathTrackingDataStore;
 import org.apache.sirona.store.tracking.CollectorPathTrackingDataStore;
@@ -46,6 +47,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static org.apache.sirona.cassandra.collector.CassandraSirona.*;
 
@@ -85,6 +88,26 @@ public class CassandraPathTrackingDataSt
     };
 
 
+    private static final boolean USE_EXECUTORS = Boolean.parseBoolean(
+        Configuration.getProperty( Configuration.CONFIG_PROPERTY_PREFIX + "pathtracking.cassandra.useexecutors",
+                                   "false" )
+    );
+
+
+    protected static ExecutorService EXECUTORSERVICE;
+
+    static
+    {
+
+        if ( USE_EXECUTORS )
+        {
+            int threadsNumber =
+                Configuration.getInteger( Configuration.CONFIG_PROPERTY_PREFIX + "pathtracking.cassandra.executors",
+                                          5 );
+            EXECUTORSERVICE = Executors.newFixedThreadPool( threadsNumber );
+        }
+    }
+
     public CassandraPathTrackingDataStore()
     {
         this.cassandra = IoCs.findOrCreateInstance( CassandraSirona.class );
@@ -94,9 +117,26 @@ public class CassandraPathTrackingDataSt
     }
 
     @Override
-    public void store( PathTrackingEntry pathTrackingEntry )
+    public void store( final PathTrackingEntry pathTrackingEntry )
     {
-        store( Collections.singletonList( pathTrackingEntry ) );
+        Runnable runnable = new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                store( Collections.singletonList( pathTrackingEntry ) );
+            }
+        };
+
+        if ( USE_EXECUTORS )
+        {
+            EXECUTORSERVICE.submit( runnable );
+        }
+        else
+        {
+            runnable.run();
+        }
+
     }
 
     @Override