You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sirona.apache.org by ol...@apache.org on 2014/03/06 01:02:45 UTC

svn commit: r1574725 - in /incubator/sirona/trunk: agent/javaagent/src/test/java/org/apache/test/sirona/javaagent/ agent/store/cube/src/main/java/org/apache/sirona/cube/ core/src/main/java/org/apache/sirona/store/tracking/ core/src/main/java/org/apache...

Author: olamy
Date: Thu Mar  6 00:02:45 2014
New Revision: 1574725

URL: http://svn.apache.org/r1574725
Log:
using unsafe to store path tracking entries

Modified:
    incubator/sirona/trunk/agent/javaagent/src/test/java/org/apache/test/sirona/javaagent/PathTrackingInvocationListenerTest.java
    incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java
    incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeBuilder.java
    incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java
    incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java
    incubator/sirona/trunk/core/src/main/java/org/apache/sirona/tracking/PathTracker.java

Modified: incubator/sirona/trunk/agent/javaagent/src/test/java/org/apache/test/sirona/javaagent/PathTrackingInvocationListenerTest.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/javaagent/src/test/java/org/apache/test/sirona/javaagent/PathTrackingInvocationListenerTest.java?rev=1574725&r1=1574724&r2=1574725&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/javaagent/src/test/java/org/apache/test/sirona/javaagent/PathTrackingInvocationListenerTest.java (original)
+++ incubator/sirona/trunk/agent/javaagent/src/test/java/org/apache/test/sirona/javaagent/PathTrackingInvocationListenerTest.java Thu Mar  6 00:02:45 2014
@@ -21,6 +21,7 @@ import org.apache.sirona.javaagent.Agent
 import org.apache.sirona.javaagent.JavaAgentRunner;
 import org.apache.sirona.pathtracking.test.ExtendedInMemoryPathTrackingDataStore;
 import org.apache.sirona.store.DataStoreFactory;
+import org.apache.sirona.tracking.PathTracker;
 import org.apache.sirona.tracking.PathTrackingEntry;
 import org.junit.Assert;
 import org.junit.Test;

Modified: incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java?rev=1574725&r1=1574724&r2=1574725&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java (original)
+++ incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java Thu Mar  6 00:02:45 2014
@@ -160,30 +160,26 @@ public class Cube {
             connection.setUseCaches(false);
             connection.setDoInput(true);
             connection.setDoOutput(true);
-
+            connection.setReadTimeout( config.getPostTimeout() );
             OutputStream output = null;
 
+
+            output = connection.getOutputStream();
             try {
-                output = connection.getOutputStream();
-                try {
-                    // FIXME find a more efficient way to prevent to have all of this in memory
-                    output.write( bytes );
-                    output.flush();
-
-                    final int status = connection.getResponseCode();
-                    if (status / 100 != 2) {
-                        LOGGER.warning("Pushed data but response code is: " + status);
-                    }
-                } finally {
-                    if (output != null) {
-                        output.close();
-                    }
+                // FIXME find a more efficient way to prevent to have all of this in memory
+                output.write( bytes );
+                output.flush();
+
+                final int status = connection.getResponseCode();
+                if (status / 100 != 2) {
+                    LOGGER.warning("Pushed data but response code is: " + status);
                 }
             } finally {
                 if (output != null) {
                     output.close();
                 }
             }
+
         } catch (final Exception e) {
             LOGGER.log(Level.WARNING, "Can't post data to collector", e);
         }

Modified: incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeBuilder.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeBuilder.java?rev=1574725&r1=1574724&r2=1574725&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeBuilder.java (original)
+++ incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeBuilder.java Thu Mar  6 00:02:45 2014
@@ -48,8 +48,17 @@ public class CubeBuilder {
     private String sslKeyStorePassword;
     private String sslKeyStoreProvider;
     private String basicHeader; // user:pwd
+
+    /**
+     * compression off per default
+     */
     private boolean useCompression = false;
 
+    /**
+     * default timeout of 5s
+     */
+    private int postTimeout = 5000;
+
     public synchronized Cube build() {
         if (marker == null) {
             marker = Localhosts.get();
@@ -110,6 +119,16 @@ public class CubeBuilder {
         this.useCompression = useCompression;
     }
 
+    public int getPostTimeout()
+    {
+        return postTimeout;
+    }
+
+    public void setPostTimeout( int postTimeout )
+    {
+        this.postTimeout = postTimeout;
+    }
+
     private TrustManager[] createTrustManager() {
         if (sslTrustStore == null) {
             return null;

Modified: incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java?rev=1574725&r1=1574724&r2=1574725&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java (original)
+++ incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java Thu Mar  6 00:02:45 2014
@@ -17,6 +17,8 @@
 
 package org.apache.sirona.cube;
 
+import org.apache.sirona.configuration.Configuration;
+import org.apache.sirona.configuration.ioc.Destroying;
 import org.apache.sirona.configuration.ioc.IoCs;
 import org.apache.sirona.store.tracking.BatchPathTrackingDataStore;
 import org.apache.sirona.store.tracking.CollectorPathTrackingDataStore;
@@ -25,6 +27,8 @@ import org.apache.sirona.tracking.PathTr
 import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  *
@@ -36,25 +40,65 @@ public class CubePathTrackingDataStore
     private final Cube cube = IoCs.findOrCreateInstance( CubeBuilder.class ).build();
 
 
+    private static final boolean useExecutors = Boolean.parseBoolean(
+        Configuration.getProperty( Configuration.CONFIG_PROPERTY_PREFIX + "pathtracking.post.useexecutors", "false" ) );
+
+
+    protected static ExecutorService executorService;
+
+    static
+    {
+
+        if ( useExecutors )
+        {
+            int threadsNumber =
+                Configuration.getInteger( Configuration.CONFIG_PROPERTY_PREFIX + "pathtracking.post.executors", 5 );
+            executorService = Executors.newFixedThreadPool( threadsNumber );
+
+        }
+    }
+
     @Override
     public void store( Collection<PathTrackingEntry> pathTrackingEntries )
     {
-        for (PathTrackingEntry pathTrackingEntry : pathTrackingEntries)
+
+        for ( final PathTrackingEntry pathTrackingEntry : pathTrackingEntries )
         {
-            cube.post( cube.pathTrackingSnapshot( pathTrackingEntry ) );
+            Runnable runnable = new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    cube.post( cube.pathTrackingSnapshot( pathTrackingEntry ) );
+                }
+            };
+            if ( useExecutors )
+            {
+                executorService.submit( runnable );
+            }
+            else
+            {
+                runnable.run();
+            }
+
         }
     }
 
     @Override
-    protected void pushEntriesByBatch( Map<String, Set<PathTrackingEntry>> pathTrackingEntries ) {
-        for ( Map.Entry<String, Set<PathTrackingEntry>> entry : pathTrackingEntries.entrySet())
+    protected void pushEntriesByBatch( Map<String, Set<PathTrackingEntry>> pathTrackingEntries )
+    {
+        for ( Map.Entry<String, Set<PathTrackingEntry>> entry : pathTrackingEntries.entrySet() )
         {
-            for (PathTrackingEntry pathTrackingEntry : entry.getValue())
+            for ( PathTrackingEntry pathTrackingEntry : entry.getValue() )
             {
                 cube.post( cube.pathTrackingSnapshot( pathTrackingEntry ) );
             }
         }
     }
 
-
+    @Destroying
+    public void destroy()
+    {
+        executorService.shutdownNow();
+    }
 }

Modified: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java?rev=1574725&r1=1574724&r2=1574725&view=diff
==============================================================================
--- incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java (original)
+++ incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java Thu Mar  6 00:02:45 2014
@@ -27,7 +27,6 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.ObjectStreamClass;
 import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -50,9 +49,10 @@ public class InMemoryPathTrackingDataSto
 {
     /**
      * store path track tracking entries list per path tracking id
+     * the value is the memory address
      */
-    private ConcurrentMap<String, List<ByteBuffer>> pathTrackingEntries =
-        new ConcurrentHashMap<String, List<ByteBuffer>>( 50 );
+    private ConcurrentMap<String, List<Pointer>> pathTrackingEntries =
+        new ConcurrentHashMap<String, List<Pointer>>( 50 );
 
     @Override
     public void store( PathTrackingEntry pathTrackingEntry )
@@ -84,11 +84,11 @@ public class InMemoryPathTrackingDataSto
 
         for ( Map.Entry<String, Set<PathTrackingEntry>> entry : entries.entrySet() )
         {
-            List<ByteBuffer> entriesList = this.pathTrackingEntries.get( entry.getKey() );
+            List<Pointer> entriesList = this.pathTrackingEntries.get( entry.getKey() );
 
             if ( entriesList == null )
             {
-                entriesList = new ArrayList<ByteBuffer>();
+                entriesList = new ArrayList<Pointer>();
             }
             entriesList.addAll( serialize( entry.getValue() ) );
             this.pathTrackingEntries.put( entry.getKey(), entriesList );
@@ -99,7 +99,7 @@ public class InMemoryPathTrackingDataSto
     @Override
     public Collection<PathTrackingEntry> retrieve( String trackingId )
     {
-        List<ByteBuffer> buffers = this.pathTrackingEntries.get( trackingId );
+        List<Pointer> buffers = this.pathTrackingEntries.get( trackingId );
 
         return deserialize( buffers );
     }
@@ -108,14 +108,14 @@ public class InMemoryPathTrackingDataSto
     public Collection<String> retrieveTrackingIds( Date startTime, Date endTime )
     {
         List<String> trackingIds = new ArrayList<String>();
-        for ( List<ByteBuffer> buffers : this.pathTrackingEntries.values() )
+        for ( List<Pointer> buffers : this.pathTrackingEntries.values() )
         {
             if ( pathTrackingEntries.isEmpty() )
             {
                 continue;
             }
 
-            PathTrackingEntry first = deserialize( buffers.iterator().next().array() );
+            PathTrackingEntry first = deserialize( readBytes( buffers.iterator().next() ) );
 
             if ( first.getStartTime() / 1000000 > startTime.getTime() //
                 && first.getStartTime() / 1000000 < endTime.getTime() )
@@ -126,16 +126,13 @@ public class InMemoryPathTrackingDataSto
         return trackingIds;
     }
 
-    private Collection<PathTrackingEntry> deserialize( List<ByteBuffer> buffers )
+    private Collection<PathTrackingEntry> deserialize( List<Pointer> buffers )
     {
         List<PathTrackingEntry> entries = new ArrayList<PathTrackingEntry>( buffers.size() );
 
-        for ( ByteBuffer byteBuffer : buffers )
+        for ( Pointer pointer : buffers )
         {
-            byteBuffer.rewind();
-            int size = byteBuffer.remaining();
-            byte[] bytes = new byte[size];
-            byteBuffer.get( bytes, 0, size );
+            byte[] bytes = readBytes( pointer );
 
             PathTrackingEntry entry = deserialize( bytes );
             if ( entry != null )
@@ -147,18 +144,44 @@ public class InMemoryPathTrackingDataSto
         return entries;
     }
 
-    private List<ByteBuffer> serialize( Collection<PathTrackingEntry> entries )
+    private byte[] readBytes( Pointer pointer )
     {
-        List<ByteBuffer> buffers = new ArrayList<ByteBuffer>( entries.size() );
+        byte[] bytes = new byte[pointer.size];
+        int length = pointer.size;
+        long offset = pointer.offheapPointer;
+        for ( int pos = 0; pos < length; pos++ )
+        {
+            bytes[pos] = getUnsafe().getByte( pos + offset );
+        }
+        return bytes;
+    }
+
+    private static class Pointer
+    {
+        int size;
+
+        long offheapPointer;
+    }
+
+    private List<Pointer> serialize( Collection<PathTrackingEntry> entries )
+    {
+        List<Pointer> buffers = new ArrayList<Pointer>( entries.size() );
 
         for ( PathTrackingEntry entry : entries )
         {
             byte[] bytes = serialize( entry );
             if ( bytes != null )
             {
-                ByteBuffer buffer = ByteBuffer.allocateDirect( bytes.length );
-                buffer.put( bytes );
-                buffers.add( buffer );
+                long offheapPointer = getUnsafe().allocateMemory( bytes.length );
+                Pointer pointer = new Pointer();
+                pointer.offheapPointer = offheapPointer;
+                pointer.size = bytes.length;
+                for ( int i = 0, size = bytes.length; i < size; i++ )
+                {
+                    getUnsafe().putByte( offheapPointer + i, bytes[i] );
+                }
+                buffers.add( pointer );
+
             }
         }
 
@@ -168,15 +191,15 @@ public class InMemoryPathTrackingDataSto
     @Override
     public void clearEntries()
     {
-        for ( Map.Entry<String, List<ByteBuffer>> entry : pathTrackingEntries.entrySet() )
+        for ( Map.Entry<String, List<Pointer>> entry : pathTrackingEntries.entrySet() )
         {
             // clear entries to not wait gc
-            for ( ByteBuffer byteBuffer : entry.getValue() )
+            for ( Pointer pointer : entry.getValue() )
             {
-                byteBuffer.clear();
+                getUnsafe().freeMemory( pointer.offheapPointer );
             }
         }
-        pathTrackingEntries = new ConcurrentHashMap<String, List<ByteBuffer>>( 50 );
+        pathTrackingEntries = new ConcurrentHashMap<String, List<Pointer>>( 50 );
     }
 
     protected Map<String, Set<PathTrackingEntry>> getPathTrackingEntries()
@@ -185,7 +208,7 @@ public class InMemoryPathTrackingDataSto
         Map<String, Set<PathTrackingEntry>> entries =
             new HashMap<String, Set<PathTrackingEntry>>( this.pathTrackingEntries.size() );
 
-        for ( Map.Entry<String, List<ByteBuffer>> entry : this.pathTrackingEntries.entrySet() )
+        for ( Map.Entry<String, List<Pointer>> entry : this.pathTrackingEntries.entrySet() )
         {
             Set<PathTrackingEntry> pathTrackingEntries =
                 new TreeSet<PathTrackingEntry>( PathTrackingEntryComparator.INSTANCE );
@@ -241,6 +264,7 @@ public class InMemoryPathTrackingDataSto
             // ignore as should not happen anyway log the stack trace
             e.printStackTrace();
         }
+
         return null;
     }
 

Modified: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/tracking/PathTracker.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/core/src/main/java/org/apache/sirona/tracking/PathTracker.java?rev=1574725&r1=1574724&r2=1574725&view=diff
==============================================================================
--- incubator/sirona/trunk/core/src/main/java/org/apache/sirona/tracking/PathTracker.java (original)
+++ incubator/sirona/trunk/core/src/main/java/org/apache/sirona/tracking/PathTracker.java Thu Mar  6 00:02:45 2014
@@ -18,6 +18,7 @@ package org.apache.sirona.tracking;
 
 
 import org.apache.sirona.configuration.Configuration;
+import org.apache.sirona.configuration.ioc.Destroying;
 import org.apache.sirona.configuration.ioc.IoCs;
 import org.apache.sirona.store.DataStoreFactory;
 import org.apache.sirona.store.tracking.PathTrackingDataStore;
@@ -25,6 +26,8 @@ import org.apache.sirona.store.tracking.
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -32,9 +35,9 @@ import java.util.concurrent.atomic.Atomi
  */
 public class PathTracker
 {
-    private static final String NODE = Configuration.getProperty(
-        Configuration.CONFIG_PROPERTY_PREFIX + "javaagent.path.tracking.marker", //
-        Configuration.getProperty( "org.apache.sirona.cube.CubeBuilder.marker", "node" ));
+    private static final String NODE =
+        Configuration.getProperty( Configuration.CONFIG_PROPERTY_PREFIX + "javaagent.path.tracking.marker", //
+                                   Configuration.getProperty( "org.apache.sirona.cube.CubeBuilder.marker", "node" ) );
 
     private static final PathTrackingDataStore PATH_TRACKING_DATA_STORE =
         IoCs.findOrCreateInstance( DataStoreFactory.class ).getPathTrackingDataStore();
@@ -50,7 +53,27 @@ public class PathTracker
 
     private final PathTrackingInformation pathTrackingInformation;
 
-    public PathTracker( final PathTrackingInformation pathTrackingInformation )
+    private static final boolean USE_EXECUTORS = Boolean.parseBoolean(
+        Configuration.getProperty( Configuration.CONFIG_PROPERTY_PREFIX + "pathtracking.useexecutors", "false" ) );
+
+    private static boolean USE_SINGLE_STORE = Boolean.parseBoolean(
+        Configuration.getProperty( Configuration.CONFIG_PROPERTY_PREFIX + "pathtracking.singlestore", "false" ) );
+
+
+    protected static ExecutorService executorService;
+
+    static
+    {
+
+        if ( USE_EXECUTORS )
+        {
+            int threadsNumber =
+                Configuration.getInteger( Configuration.CONFIG_PROPERTY_PREFIX + "pathtracking.executors", 5 );
+            executorService = Executors.newFixedThreadPool( threadsNumber );
+        }
+    }
+
+    private PathTracker( final PathTrackingInformation pathTrackingInformation )
     {
         this.pathTrackingInformation = pathTrackingInformation;
     }
@@ -189,6 +212,7 @@ public class PathTracker
         return new PathTracker( pathTrackingInformation );
     }
 
+
     public void stop()
     {
         final long end = System.nanoTime();
@@ -209,14 +233,50 @@ public class PathTracker
             new PathTrackingEntry( uuid, NODE, pathTrackingInformation.getClassName(),
                                    pathTrackingInformation.getMethodName(), start, ( end - start ),
                                    pathTrackingInformation.getLevel() );
-
-        context.entries.add( pathTrackingEntry );
+        if ( USE_SINGLE_STORE )
+        {
+            PATH_TRACKING_DATA_STORE.store( pathTrackingEntry );
+        }
+        else
+        {
+            context.entries.add( pathTrackingEntry );
+        }
 
         if ( pathTrackingInformation.getLevel() == 1 && pathTrackingInformation.getParent() == null )
         { // 0 is never reached so 1 is first
-            PATH_TRACKING_DATA_STORE.store( context.entries );
-            PathTracker.cleanUp();
+            if (!USE_SINGLE_STORE)
+            {
+                Runnable runnable = new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        PATH_TRACKING_DATA_STORE.store( context.entries );
+                        PathTracker.cleanUp();
+                    }
+                };
+                if ( USE_EXECUTORS )
+                {
+                    executorService.submit( runnable );
+                }
+                else
+                {
+                    runnable.run();
+                }
+            }
         }
     }
 
+    @Destroying
+    public void destroy()
+    {
+        PathTracker.shutdown();
+    }
+
+    public static void shutdown()
+    {
+        executorService.shutdownNow();
+    }
+
+
 }