You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sirona.apache.org by ol...@apache.org on 2014/03/06 01:02:45 UTC
svn commit: r1574725 - in /incubator/sirona/trunk:
agent/javaagent/src/test/java/org/apache/test/sirona/javaagent/
agent/store/cube/src/main/java/org/apache/sirona/cube/
core/src/main/java/org/apache/sirona/store/tracking/
core/src/main/java/org/apache...
Author: olamy
Date: Thu Mar 6 00:02:45 2014
New Revision: 1574725
URL: http://svn.apache.org/r1574725
Log:
using unsafe to store path tracking entries
Modified:
incubator/sirona/trunk/agent/javaagent/src/test/java/org/apache/test/sirona/javaagent/PathTrackingInvocationListenerTest.java
incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java
incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeBuilder.java
incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java
incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java
incubator/sirona/trunk/core/src/main/java/org/apache/sirona/tracking/PathTracker.java
Modified: incubator/sirona/trunk/agent/javaagent/src/test/java/org/apache/test/sirona/javaagent/PathTrackingInvocationListenerTest.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/javaagent/src/test/java/org/apache/test/sirona/javaagent/PathTrackingInvocationListenerTest.java?rev=1574725&r1=1574724&r2=1574725&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/javaagent/src/test/java/org/apache/test/sirona/javaagent/PathTrackingInvocationListenerTest.java (original)
+++ incubator/sirona/trunk/agent/javaagent/src/test/java/org/apache/test/sirona/javaagent/PathTrackingInvocationListenerTest.java Thu Mar 6 00:02:45 2014
@@ -21,6 +21,7 @@ import org.apache.sirona.javaagent.Agent
import org.apache.sirona.javaagent.JavaAgentRunner;
import org.apache.sirona.pathtracking.test.ExtendedInMemoryPathTrackingDataStore;
import org.apache.sirona.store.DataStoreFactory;
+import org.apache.sirona.tracking.PathTracker;
import org.apache.sirona.tracking.PathTrackingEntry;
import org.junit.Assert;
import org.junit.Test;
Modified: incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java?rev=1574725&r1=1574724&r2=1574725&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java (original)
+++ incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java Thu Mar 6 00:02:45 2014
@@ -160,30 +160,26 @@ public class Cube {
connection.setUseCaches(false);
connection.setDoInput(true);
connection.setDoOutput(true);
-
+ connection.setReadTimeout( config.getPostTimeout() );
OutputStream output = null;
+
+ output = connection.getOutputStream();
try {
- output = connection.getOutputStream();
- try {
- // FIXME find a more efficient way to prevent to have all of this in memory
- output.write( bytes );
- output.flush();
-
- final int status = connection.getResponseCode();
- if (status / 100 != 2) {
- LOGGER.warning("Pushed data but response code is: " + status);
- }
- } finally {
- if (output != null) {
- output.close();
- }
+ // FIXME find a more efficient way to prevent to have all of this in memory
+ output.write( bytes );
+ output.flush();
+
+ final int status = connection.getResponseCode();
+ if (status / 100 != 2) {
+ LOGGER.warning("Pushed data but response code is: " + status);
}
} finally {
if (output != null) {
output.close();
}
}
+
} catch (final Exception e) {
LOGGER.log(Level.WARNING, "Can't post data to collector", e);
}
Modified: incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeBuilder.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeBuilder.java?rev=1574725&r1=1574724&r2=1574725&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeBuilder.java (original)
+++ incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeBuilder.java Thu Mar 6 00:02:45 2014
@@ -48,8 +48,17 @@ public class CubeBuilder {
private String sslKeyStorePassword;
private String sslKeyStoreProvider;
private String basicHeader; // user:pwd
+
+ /**
+ * compression off per default
+ */
private boolean useCompression = false;
+ /**
+ * default timeout of 5s
+ */
+ private int postTimeout = 5000;
+
public synchronized Cube build() {
if (marker == null) {
marker = Localhosts.get();
@@ -110,6 +119,16 @@ public class CubeBuilder {
this.useCompression = useCompression;
}
+ public int getPostTimeout()
+ {
+ return postTimeout;
+ }
+
+ public void setPostTimeout( int postTimeout )
+ {
+ this.postTimeout = postTimeout;
+ }
+
private TrustManager[] createTrustManager() {
if (sslTrustStore == null) {
return null;
Modified: incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java?rev=1574725&r1=1574724&r2=1574725&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java (original)
+++ incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java Thu Mar 6 00:02:45 2014
@@ -17,6 +17,8 @@
package org.apache.sirona.cube;
+import org.apache.sirona.configuration.Configuration;
+import org.apache.sirona.configuration.ioc.Destroying;
import org.apache.sirona.configuration.ioc.IoCs;
import org.apache.sirona.store.tracking.BatchPathTrackingDataStore;
import org.apache.sirona.store.tracking.CollectorPathTrackingDataStore;
@@ -25,6 +27,8 @@ import org.apache.sirona.tracking.PathTr
import java.util.Collection;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
/**
*
@@ -36,25 +40,65 @@ public class CubePathTrackingDataStore
private final Cube cube = IoCs.findOrCreateInstance( CubeBuilder.class ).build();
+ private static final boolean useExecutors = Boolean.parseBoolean(
+ Configuration.getProperty( Configuration.CONFIG_PROPERTY_PREFIX + "pathtracking.post.useexecutors", "false" ) );
+
+
+ protected static ExecutorService executorService;
+
+ static
+ {
+
+ if ( useExecutors )
+ {
+ int threadsNumber =
+ Configuration.getInteger( Configuration.CONFIG_PROPERTY_PREFIX + "pathtracking.post.executors", 5 );
+ executorService = Executors.newFixedThreadPool( threadsNumber );
+
+ }
+ }
+
@Override
public void store( Collection<PathTrackingEntry> pathTrackingEntries )
{
- for (PathTrackingEntry pathTrackingEntry : pathTrackingEntries)
+
+ for ( final PathTrackingEntry pathTrackingEntry : pathTrackingEntries )
{
- cube.post( cube.pathTrackingSnapshot( pathTrackingEntry ) );
+ Runnable runnable = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ cube.post( cube.pathTrackingSnapshot( pathTrackingEntry ) );
+ }
+ };
+ if ( useExecutors )
+ {
+ executorService.submit( runnable );
+ }
+ else
+ {
+ runnable.run();
+ }
+
}
}
@Override
- protected void pushEntriesByBatch( Map<String, Set<PathTrackingEntry>> pathTrackingEntries ) {
- for ( Map.Entry<String, Set<PathTrackingEntry>> entry : pathTrackingEntries.entrySet())
+ protected void pushEntriesByBatch( Map<String, Set<PathTrackingEntry>> pathTrackingEntries )
+ {
+ for ( Map.Entry<String, Set<PathTrackingEntry>> entry : pathTrackingEntries.entrySet() )
{
- for (PathTrackingEntry pathTrackingEntry : entry.getValue())
+ for ( PathTrackingEntry pathTrackingEntry : entry.getValue() )
{
cube.post( cube.pathTrackingSnapshot( pathTrackingEntry ) );
}
}
}
-
+ @Destroying
+ public void destroy()
+ {
+ executorService.shutdownNow();
+ }
}
Modified: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java?rev=1574725&r1=1574724&r2=1574725&view=diff
==============================================================================
--- incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java (original)
+++ incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java Thu Mar 6 00:02:45 2014
@@ -27,7 +27,6 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.ObjectStreamClass;
import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -50,9 +49,10 @@ public class InMemoryPathTrackingDataSto
{
/**
* store path track tracking entries list per path tracking id
+ * the value is the memory address
*/
- private ConcurrentMap<String, List<ByteBuffer>> pathTrackingEntries =
- new ConcurrentHashMap<String, List<ByteBuffer>>( 50 );
+ private ConcurrentMap<String, List<Pointer>> pathTrackingEntries =
+ new ConcurrentHashMap<String, List<Pointer>>( 50 );
@Override
public void store( PathTrackingEntry pathTrackingEntry )
@@ -84,11 +84,11 @@ public class InMemoryPathTrackingDataSto
for ( Map.Entry<String, Set<PathTrackingEntry>> entry : entries.entrySet() )
{
- List<ByteBuffer> entriesList = this.pathTrackingEntries.get( entry.getKey() );
+ List<Pointer> entriesList = this.pathTrackingEntries.get( entry.getKey() );
if ( entriesList == null )
{
- entriesList = new ArrayList<ByteBuffer>();
+ entriesList = new ArrayList<Pointer>();
}
entriesList.addAll( serialize( entry.getValue() ) );
this.pathTrackingEntries.put( entry.getKey(), entriesList );
@@ -99,7 +99,7 @@ public class InMemoryPathTrackingDataSto
@Override
public Collection<PathTrackingEntry> retrieve( String trackingId )
{
- List<ByteBuffer> buffers = this.pathTrackingEntries.get( trackingId );
+ List<Pointer> buffers = this.pathTrackingEntries.get( trackingId );
return deserialize( buffers );
}
@@ -108,14 +108,14 @@ public class InMemoryPathTrackingDataSto
public Collection<String> retrieveTrackingIds( Date startTime, Date endTime )
{
List<String> trackingIds = new ArrayList<String>();
- for ( List<ByteBuffer> buffers : this.pathTrackingEntries.values() )
+ for ( List<Pointer> buffers : this.pathTrackingEntries.values() )
{
if ( pathTrackingEntries.isEmpty() )
{
continue;
}
- PathTrackingEntry first = deserialize( buffers.iterator().next().array() );
+ PathTrackingEntry first = deserialize( readBytes( buffers.iterator().next() ) );
if ( first.getStartTime() / 1000000 > startTime.getTime() //
&& first.getStartTime() / 1000000 < endTime.getTime() )
@@ -126,16 +126,13 @@ public class InMemoryPathTrackingDataSto
return trackingIds;
}
- private Collection<PathTrackingEntry> deserialize( List<ByteBuffer> buffers )
+ private Collection<PathTrackingEntry> deserialize( List<Pointer> buffers )
{
List<PathTrackingEntry> entries = new ArrayList<PathTrackingEntry>( buffers.size() );
- for ( ByteBuffer byteBuffer : buffers )
+ for ( Pointer pointer : buffers )
{
- byteBuffer.rewind();
- int size = byteBuffer.remaining();
- byte[] bytes = new byte[size];
- byteBuffer.get( bytes, 0, size );
+ byte[] bytes = readBytes( pointer );
PathTrackingEntry entry = deserialize( bytes );
if ( entry != null )
@@ -147,18 +144,44 @@ public class InMemoryPathTrackingDataSto
return entries;
}
- private List<ByteBuffer> serialize( Collection<PathTrackingEntry> entries )
+ private byte[] readBytes( Pointer pointer )
{
- List<ByteBuffer> buffers = new ArrayList<ByteBuffer>( entries.size() );
+ byte[] bytes = new byte[pointer.size];
+ int length = pointer.size;
+ long offset = pointer.offheapPointer;
+ for ( int pos = 0; pos < length; pos++ )
+ {
+ bytes[pos] = getUnsafe().getByte( pos + offset );
+ }
+ return bytes;
+ }
+
+ private static class Pointer
+ {
+ int size;
+
+ long offheapPointer;
+ }
+
+ private List<Pointer> serialize( Collection<PathTrackingEntry> entries )
+ {
+ List<Pointer> buffers = new ArrayList<Pointer>( entries.size() );
for ( PathTrackingEntry entry : entries )
{
byte[] bytes = serialize( entry );
if ( bytes != null )
{
- ByteBuffer buffer = ByteBuffer.allocateDirect( bytes.length );
- buffer.put( bytes );
- buffers.add( buffer );
+ long offheapPointer = getUnsafe().allocateMemory( bytes.length );
+ Pointer pointer = new Pointer();
+ pointer.offheapPointer = offheapPointer;
+ pointer.size = bytes.length;
+ for ( int i = 0, size = bytes.length; i < size; i++ )
+ {
+ getUnsafe().putByte( offheapPointer + i, bytes[i] );
+ }
+ buffers.add( pointer );
+
}
}
@@ -168,15 +191,15 @@ public class InMemoryPathTrackingDataSto
@Override
public void clearEntries()
{
- for ( Map.Entry<String, List<ByteBuffer>> entry : pathTrackingEntries.entrySet() )
+ for ( Map.Entry<String, List<Pointer>> entry : pathTrackingEntries.entrySet() )
{
// clear entries to not wait gc
- for ( ByteBuffer byteBuffer : entry.getValue() )
+ for ( Pointer pointer : entry.getValue() )
{
- byteBuffer.clear();
+ getUnsafe().freeMemory( pointer.offheapPointer );
}
}
- pathTrackingEntries = new ConcurrentHashMap<String, List<ByteBuffer>>( 50 );
+ pathTrackingEntries = new ConcurrentHashMap<String, List<Pointer>>( 50 );
}
protected Map<String, Set<PathTrackingEntry>> getPathTrackingEntries()
@@ -185,7 +208,7 @@ public class InMemoryPathTrackingDataSto
Map<String, Set<PathTrackingEntry>> entries =
new HashMap<String, Set<PathTrackingEntry>>( this.pathTrackingEntries.size() );
- for ( Map.Entry<String, List<ByteBuffer>> entry : this.pathTrackingEntries.entrySet() )
+ for ( Map.Entry<String, List<Pointer>> entry : this.pathTrackingEntries.entrySet() )
{
Set<PathTrackingEntry> pathTrackingEntries =
new TreeSet<PathTrackingEntry>( PathTrackingEntryComparator.INSTANCE );
@@ -241,6 +264,7 @@ public class InMemoryPathTrackingDataSto
// ignore as should not happen anyway log the stack trace
e.printStackTrace();
}
+
return null;
}
Modified: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/tracking/PathTracker.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/core/src/main/java/org/apache/sirona/tracking/PathTracker.java?rev=1574725&r1=1574724&r2=1574725&view=diff
==============================================================================
--- incubator/sirona/trunk/core/src/main/java/org/apache/sirona/tracking/PathTracker.java (original)
+++ incubator/sirona/trunk/core/src/main/java/org/apache/sirona/tracking/PathTracker.java Thu Mar 6 00:02:45 2014
@@ -18,6 +18,7 @@ package org.apache.sirona.tracking;
import org.apache.sirona.configuration.Configuration;
+import org.apache.sirona.configuration.ioc.Destroying;
import org.apache.sirona.configuration.ioc.IoCs;
import org.apache.sirona.store.DataStoreFactory;
import org.apache.sirona.store.tracking.PathTrackingDataStore;
@@ -25,6 +26,8 @@ import org.apache.sirona.store.tracking.
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -32,9 +35,9 @@ import java.util.concurrent.atomic.Atomi
*/
public class PathTracker
{
- private static final String NODE = Configuration.getProperty(
- Configuration.CONFIG_PROPERTY_PREFIX + "javaagent.path.tracking.marker", //
- Configuration.getProperty( "org.apache.sirona.cube.CubeBuilder.marker", "node" ));
+ private static final String NODE =
+ Configuration.getProperty( Configuration.CONFIG_PROPERTY_PREFIX + "javaagent.path.tracking.marker", //
+ Configuration.getProperty( "org.apache.sirona.cube.CubeBuilder.marker", "node" ) );
private static final PathTrackingDataStore PATH_TRACKING_DATA_STORE =
IoCs.findOrCreateInstance( DataStoreFactory.class ).getPathTrackingDataStore();
@@ -50,7 +53,27 @@ public class PathTracker
private final PathTrackingInformation pathTrackingInformation;
- public PathTracker( final PathTrackingInformation pathTrackingInformation )
+ private static final boolean USE_EXECUTORS = Boolean.parseBoolean(
+ Configuration.getProperty( Configuration.CONFIG_PROPERTY_PREFIX + "pathtracking.useexecutors", "false" ) );
+
+ private static boolean USE_SINGLE_STORE = Boolean.parseBoolean(
+ Configuration.getProperty( Configuration.CONFIG_PROPERTY_PREFIX + "pathtracking.singlestore", "false" ) );
+
+
+ protected static ExecutorService executorService;
+
+ static
+ {
+
+ if ( USE_EXECUTORS )
+ {
+ int threadsNumber =
+ Configuration.getInteger( Configuration.CONFIG_PROPERTY_PREFIX + "pathtracking.executors", 5 );
+ executorService = Executors.newFixedThreadPool( threadsNumber );
+ }
+ }
+
+ private PathTracker( final PathTrackingInformation pathTrackingInformation )
{
this.pathTrackingInformation = pathTrackingInformation;
}
@@ -189,6 +212,7 @@ public class PathTracker
return new PathTracker( pathTrackingInformation );
}
+
public void stop()
{
final long end = System.nanoTime();
@@ -209,14 +233,50 @@ public class PathTracker
new PathTrackingEntry( uuid, NODE, pathTrackingInformation.getClassName(),
pathTrackingInformation.getMethodName(), start, ( end - start ),
pathTrackingInformation.getLevel() );
-
- context.entries.add( pathTrackingEntry );
+ if ( USE_SINGLE_STORE )
+ {
+ PATH_TRACKING_DATA_STORE.store( pathTrackingEntry );
+ }
+ else
+ {
+ context.entries.add( pathTrackingEntry );
+ }
if ( pathTrackingInformation.getLevel() == 1 && pathTrackingInformation.getParent() == null )
{ // 0 is never reached so 1 is first
- PATH_TRACKING_DATA_STORE.store( context.entries );
- PathTracker.cleanUp();
+ if (!USE_SINGLE_STORE)
+ {
+ Runnable runnable = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ PATH_TRACKING_DATA_STORE.store( context.entries );
+ PathTracker.cleanUp();
+ }
+ };
+ if ( USE_EXECUTORS )
+ {
+ executorService.submit( runnable );
+ }
+ else
+ {
+ runnable.run();
+ }
+ }
}
}
+ @Destroying
+ public void destroy()
+ {
+ PathTracker.shutdown();
+ }
+
+ public static void shutdown()
+ {
+ executorService.shutdownNow();
+ }
+
+
}