You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sirona.apache.org by ol...@apache.org on 2014/03/06 01:03:18 UTC

svn commit: r1574727 - in /incubator/sirona/trunk: agent/store/cube/src/main/java/org/apache/sirona/cube/ core/src/main/java/org/apache/sirona/store/tracking/ core/src/main/java/org/apache/sirona/util/ server/collector/src/main/java/org/apache/sirona/c...

Author: olamy
Date: Thu Mar  6 00:03:18 2014
New Revision: 1574727

URL: http://svn.apache.org/r1574727
Log:
post PathTracking entries as bytes to the Collector to avoid huge amount of String creation

Added:
    incubator/sirona/trunk/core/src/main/java/org/apache/sirona/util/SerializeUtils.java   (with props)
    incubator/sirona/trunk/core/src/main/java/org/apache/sirona/util/UnsafeUtils.java   (with props)
Modified:
    incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java
    incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java
    incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/BatchPathTrackingDataStore.java
    incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java
    incubator/sirona/trunk/server/collector/src/main/java/org/apache/sirona/collector/server/Collector.java
    incubator/sirona/trunk/server/collector/src/test/java/org/apache/sirona/collector/server/CollectorServer.java
    incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/pathtracking/CassandraPathTrackingDataStore.java

Modified: incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java?rev=1574727&r1=1574726&r2=1574727&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java (original)
+++ incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java Thu Mar  6 00:03:18 2014
@@ -87,6 +87,8 @@ public class Cube {
     private static final String CONTENT_LENGTH = "Content-Length";
     private static final String GZIP_CONTENT_ENCODING = "gzip";
     private static final String CONTENT_ENCODING = "Content-Encoding";
+    private static final String APPLICATION_JAVA_OBJECT = "application/x-java-serialized-object";
+    private static final String X_SIRONA_CLASSNAME = "X-Sirona-ClassName";
 
     private static final String JS_ISO_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";
     private static final String UTC = "UTC";
@@ -130,6 +132,58 @@ public class Cube {
         }
     }
 
+    public void postBytes(byte[] bytes, String className)
+    {
+        try {
+            final URL url = new URL(config.getCollector());
+
+            final HttpURLConnection connection = HttpURLConnection.class.cast(url.openConnection(proxy));
+
+            final SSLSocketFactory socketFactory = config.getSocketFactory();
+            if (socketFactory != null && "https".equals(url.getProtocol())) {
+                HttpsURLConnection.class.cast(connection).setSSLSocketFactory(socketFactory);
+            }
+
+            final String auth = config.getBasicHeader();
+            if (auth != null) {
+                connection.setRequestProperty("Authorization", auth);
+            }
+
+
+            boolean useCompression = config.isUseCompression();
+
+            connection.setRequestMethod(POST);
+            connection.setRequestProperty(CONTENT_TYPE, APPLICATION_JAVA_OBJECT);
+            connection.setRequestProperty( X_SIRONA_CLASSNAME, className );
+            connection.setRequestProperty(CONTENT_LENGTH, Long.toString(bytes.length));
+            connection.setUseCaches(false);
+            connection.setDoInput(true);
+            connection.setDoOutput(true);
+            connection.setReadTimeout( config.getPostTimeout() );
+            OutputStream output = null;
+
+
+            output = connection.getOutputStream();
+            try {
+                // FIXME find a more efficient way to prevent to have all of this in memory
+                output.write( bytes );
+                output.flush();
+
+                final int status = connection.getResponseCode();
+                if (status / 100 != 2) {
+                    LOGGER.warning("Pushed data but response code is: " + status);
+                }
+            } finally {
+                if (output != null) {
+                    output.close();
+                }
+            }
+
+        } catch (final Exception e) {
+            LOGGER.log(Level.WARNING, "Can't post data to collector", e);
+        }
+    }
+
     private void doPost(final String payload) {
         try {
             final URL url = new URL(config.getCollector());

Modified: incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java?rev=1574727&r1=1574726&r2=1574727&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java (original)
+++ incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java Thu Mar  6 00:03:18 2014
@@ -25,8 +25,8 @@ import org.apache.sirona.store.tracking.
 import org.apache.sirona.tracking.PathTrackingEntry;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -59,39 +59,13 @@ public class CubePathTrackingDataStore
     }
 
     @Override
-    public void store( Collection<PathTrackingEntry> pathTrackingEntries )
+    protected void pushEntriesByBatch( Map<String, List<Pointer>> pathTrackingEntries )
     {
-
-        for ( final PathTrackingEntry pathTrackingEntry : pathTrackingEntries )
-        {
-            Runnable runnable = new Runnable()
-            {
-                @Override
-                public void run()
-                {
-                    cube.post( cube.pathTrackingSnapshot( pathTrackingEntry ) );
-                }
-            };
-            if ( useExecutors )
-            {
-                executorService.submit( runnable );
-            }
-            else
-            {
-                runnable.run();
-            }
-
-        }
-    }
-
-    @Override
-    protected void pushEntriesByBatch( Map<String, Set<PathTrackingEntry>> pathTrackingEntries )
-    {
-        for ( Map.Entry<String, Set<PathTrackingEntry>> entry : pathTrackingEntries.entrySet() )
+        for ( Map.Entry<String, List<Pointer>> entry : pathTrackingEntries.entrySet() )
         {
-            for ( PathTrackingEntry pathTrackingEntry : entry.getValue() )
+            for ( Pointer pointer : entry.getValue() )
             {
-                cube.post( cube.pathTrackingSnapshot( pathTrackingEntry ) );
+                cube.postBytes( readBytes( pointer ), PathTrackingEntry.class.getName() );
             }
         }
     }

Modified: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/BatchPathTrackingDataStore.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/BatchPathTrackingDataStore.java?rev=1574727&r1=1574726&r2=1574727&view=diff
==============================================================================
--- incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/BatchPathTrackingDataStore.java (original)
+++ incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/BatchPathTrackingDataStore.java Thu Mar  6 00:03:18 2014
@@ -23,6 +23,7 @@ import org.apache.sirona.store.BatchFutu
 import org.apache.sirona.tracking.PathTrackingEntry;
 import org.apache.sirona.util.DaemonThreadFactory;
 
+import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
@@ -79,7 +80,7 @@ public abstract class BatchPathTrackingD
         {
             try
             {
-                pushEntriesByBatch( getPathTrackingEntries() );
+                pushEntriesByBatch( getPointers() );
                 clearEntries();
             }
             catch ( final Exception e )
@@ -89,6 +90,6 @@ public abstract class BatchPathTrackingD
         }
     }
 
-    protected abstract void pushEntriesByBatch( final Map<String, Set<PathTrackingEntry>> pathTrackingEntries );
+    protected abstract void pushEntriesByBatch( final Map<String, List<Pointer>> pathTrackingEntries );
 
 }

Modified: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java?rev=1574727&r1=1574726&r2=1574727&view=diff
==============================================================================
--- incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java (original)
+++ incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java Thu Mar  6 00:03:18 2014
@@ -18,15 +18,9 @@ package org.apache.sirona.store.tracking
 
 import org.apache.sirona.tracking.PathTrackingEntry;
 import org.apache.sirona.tracking.PathTrackingEntryComparator;
-import sun.misc.Unsafe;
+import org.apache.sirona.util.SerializeUtils;
+import org.apache.sirona.util.UnsafeUtils;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.ObjectStreamClass;
-import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -48,23 +42,8 @@ public class InMemoryPathTrackingDataSto
     implements PathTrackingDataStore, CollectorPathTrackingDataStore
 {
 
-    private static Unsafe UNSAFE;
 
 
-    static
-    {
-        try
-        {
-            Field f = Unsafe.class.getDeclaredField( "theUnsafe" );
-            f.setAccessible( true );
-            UNSAFE = (Unsafe) f.get( null );
-        }
-        catch ( Exception e )
-        {
-            throw new RuntimeException( e.getMessage(), e );
-        }
-    }
-
     /**
      * store path track tracking entries list per path tracking id
      * the value is the memory address
@@ -133,7 +112,8 @@ public class InMemoryPathTrackingDataSto
                 continue;
             }
 
-            PathTrackingEntry first = deserialize( readBytes( buffers.iterator().next() ) );
+            PathTrackingEntry first =
+                SerializeUtils.deserialize( readBytes( buffers.iterator().next() ), PathTrackingEntry.class );
 
             if ( first.getStartTime() / 1000000 > startTime.getTime() //
                 && first.getStartTime() / 1000000 < endTime.getTime() )
@@ -152,7 +132,7 @@ public class InMemoryPathTrackingDataSto
         {
             byte[] bytes = readBytes( pointer );
 
-            PathTrackingEntry entry = deserialize( bytes );
+            PathTrackingEntry entry = SerializeUtils.deserialize( bytes, PathTrackingEntry.class );
             if ( entry != null )
             {
                 entries.add( entry );
@@ -162,23 +142,33 @@ public class InMemoryPathTrackingDataSto
         return entries;
     }
 
-    private byte[] readBytes( Pointer pointer )
+    public byte[] readBytes( Pointer pointer )
     {
         byte[] bytes = new byte[pointer.size];
         int length = pointer.size;
         long offset = pointer.offheapPointer;
         for ( int pos = 0; pos < length; pos++ )
         {
-            bytes[pos] = UNSAFE.getByte( pos + offset );
+            bytes[pos] = UnsafeUtils.getUnsafe().getByte( pos + offset );
         }
         return bytes;
     }
 
-    private static class Pointer
+    public static class Pointer
     {
         int size;
 
         long offheapPointer;
+
+        public int getSize()
+        {
+            return size;
+        }
+
+        public long getOffheapPointer()
+        {
+            return offheapPointer;
+        }
     }
 
     private List<Pointer> serialize( Collection<PathTrackingEntry> entries )
@@ -187,16 +177,16 @@ public class InMemoryPathTrackingDataSto
 
         for ( PathTrackingEntry entry : entries )
         {
-            byte[] bytes = serialize( entry );
+            byte[] bytes = SerializeUtils.serialize( entry );
             if ( bytes != null )
             {
-                long offheapPointer = UNSAFE.allocateMemory( bytes.length );
+                long offheapPointer = UnsafeUtils.getUnsafe().allocateMemory( bytes.length );
                 Pointer pointer = new Pointer();
                 pointer.offheapPointer = offheapPointer;
                 pointer.size = bytes.length;
                 for ( int i = 0, size = bytes.length; i < size; i++ )
                 {
-                    UNSAFE.putByte( offheapPointer + i, bytes[i] );
+                    UnsafeUtils.getUnsafe().putByte( offheapPointer + i, bytes[i] );
                 }
                 buffers.add( pointer );
 
@@ -214,7 +204,7 @@ public class InMemoryPathTrackingDataSto
             // clear entries to not wait gc
             for ( Pointer pointer : entry.getValue() )
             {
-                UNSAFE.freeMemory( pointer.offheapPointer );
+                UnsafeUtils.getUnsafe().freeMemory( pointer.offheapPointer );
             }
         }
         pathTrackingEntries = new ConcurrentHashMap<String, List<Pointer>>( 50 );
@@ -237,53 +227,14 @@ public class InMemoryPathTrackingDataSto
         return entries;
     }
 
-
-    private byte[] serialize( PathTrackingEntry pathTrackingEntry )
-    {
-        try
-        {
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            ObjectOutputStream oos = new ObjectOutputStream( baos );
-            oos.writeObject( pathTrackingEntry );
-            oos.flush();
-            oos.close();
-            return baos.toByteArray();
-        }
-        catch ( IOException e )
-        {
-            // ignore as should not happen anyway log the stack trace
-            e.printStackTrace();
-        }
-        return null;
-    }
-
-    public PathTrackingEntry deserialize( byte[] source )
+    /**
+     * direct access to datas not a copy
+     *
+     * @return
+     */
+    protected Map<String, List<Pointer>> getPointers()
     {
-        try
-        {
-            ByteArrayInputStream bis = new ByteArrayInputStream( source );
-            ObjectInputStream ois = new ObjectInputStream( bis )
-            {
-
-                @Override
-                protected Class<?> resolveClass( ObjectStreamClass objectStreamClass )
-                    throws IOException, ClassNotFoundException
-                {
-                    return PathTrackingEntry.class;
-                }
-
-            };
-            PathTrackingEntry obj = PathTrackingEntry.class.cast( ois.readObject() );
-            ois.close();
-            return obj;
-        }
-        catch ( Exception e )
-        {
-            // ignore as should not happen anyway log the stack trace
-            e.printStackTrace();
-        }
-
-        return null;
+        return this.pathTrackingEntries;
     }
 
 

Added: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/util/SerializeUtils.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/core/src/main/java/org/apache/sirona/util/SerializeUtils.java?rev=1574727&view=auto
==============================================================================
--- incubator/sirona/trunk/core/src/main/java/org/apache/sirona/util/SerializeUtils.java (added)
+++ incubator/sirona/trunk/core/src/main/java/org/apache/sirona/util/SerializeUtils.java Thu Mar  6 00:03:18 2014
@@ -0,0 +1,85 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.sirona.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+
+/**
+ *
+ */
+public class SerializeUtils
+{
+    private SerializeUtils()
+    {
+        // no op just an helper class
+    }
+
+
+    public static byte[] serialize( Object object )
+    {
+        try
+        {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream( baos );
+            oos.writeObject( object );
+            oos.flush();
+            oos.close();
+            return baos.toByteArray();
+        }
+        catch ( IOException e )
+        {
+            // ignore as should not happen anyway log the stack trace
+            e.printStackTrace();
+        }
+        return null;
+    }
+
+    public static <T> T deserialize( byte[] source, final Class<T> tClass )
+    {
+        try
+        {
+            ByteArrayInputStream bis = new ByteArrayInputStream( source );
+            ObjectInputStream ois = new ObjectInputStream( bis )
+            {
+
+                @Override
+                protected Class<?> resolveClass( ObjectStreamClass objectStreamClass )
+                    throws IOException, ClassNotFoundException
+                {
+                    return tClass;
+                }
+
+            };
+            T obj = tClass.cast( ois.readObject() );
+            ois.close();
+            return obj;
+        }
+        catch ( Exception e )
+        {
+            // ignore as should not happen anyway log the stack trace
+            e.printStackTrace();
+        }
+
+        return null;
+    }
+}

Propchange: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/util/SerializeUtils.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/util/SerializeUtils.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Added: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/util/UnsafeUtils.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/core/src/main/java/org/apache/sirona/util/UnsafeUtils.java?rev=1574727&view=auto
==============================================================================
--- incubator/sirona/trunk/core/src/main/java/org/apache/sirona/util/UnsafeUtils.java (added)
+++ incubator/sirona/trunk/core/src/main/java/org/apache/sirona/util/UnsafeUtils.java Thu Mar  6 00:03:18 2014
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sirona.util;
+
+import sun.misc.Unsafe;
+
+import java.lang.reflect.Field;
+
+/**
+ *
+ */
+public class UnsafeUtils
+{
+    private static Unsafe UNSAFE;
+
+
+    static
+    {
+        try
+        {
+            Field f = Unsafe.class.getDeclaredField( "theUnsafe" );
+            f.setAccessible( true );
+            UNSAFE = (Unsafe) f.get( null );
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException( e.getMessage(), e );
+        }
+    }
+
+    private UnsafeUtils()
+    {
+        // no op
+    }
+
+    public static Unsafe getUnsafe()
+    {
+        return UNSAFE;
+    }
+}

Propchange: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/util/UnsafeUtils.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/util/UnsafeUtils.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Modified: incubator/sirona/trunk/server/collector/src/main/java/org/apache/sirona/collector/server/Collector.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/server/collector/src/main/java/org/apache/sirona/collector/server/Collector.java?rev=1574727&r1=1574726&r2=1574727&view=diff
==============================================================================
--- incubator/sirona/trunk/server/collector/src/main/java/org/apache/sirona/collector/server/Collector.java (original)
+++ incubator/sirona/trunk/server/collector/src/main/java/org/apache/sirona/collector/server/Collector.java Thu Mar  6 00:03:18 2014
@@ -40,6 +40,7 @@ import org.apache.sirona.store.status.No
 import org.apache.sirona.store.tracking.CollectorPathTrackingDataStore;
 import org.apache.sirona.tracking.PathTrackingEntry;
 import org.apache.sirona.util.DaemonThreadFactory;
+import org.apache.sirona.util.SerializeUtils;
 
 import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.SSLSocketFactory;
@@ -49,6 +50,7 @@ import javax.servlet.ServletInputStream;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
@@ -84,6 +86,10 @@ public class Collector extends HttpServl
     private static final String PATH_TRACKING = "pathtracking";
 
     private static final String CONTENT_ENCODING = "Content-Encoding";
+    private static final String CONTENT_TYPE = "Content-Type";
+    private static final String APPLICATION_JAVA_OBJECT = "application/x-java-serialized-object";
+    private static final String X_SIRONA_CLASSNAME = "X-Sirona-ClassName";
+
 
     private static final String GET = "GET";
 
@@ -196,11 +202,22 @@ public class Collector extends HttpServl
 
         final ServletInputStream inputStream = req.getInputStream();
         try {
-            if ("gzip".equals( req.getHeader( CONTENT_ENCODING ) ))
+            if (APPLICATION_JAVA_OBJECT.equals( req.getHeader( CONTENT_TYPE ) )) {
+                if (PathTrackingEntry.class.getName().equals( req.getHeader( X_SIRONA_CLASSNAME ) )) {
+                    int length = req.getContentLength();
+                    updatePathTracking( readBytes( req.getInputStream(), length ) );
+                }
+            }
+            else
             {
-                slurpEvents(new GZIPInputStream( inputStream ));
-            } else {
-                slurpEvents(inputStream);
+                if ( "gzip".equals( req.getHeader( CONTENT_ENCODING ) ) )
+                {
+                    slurpEvents( new GZIPInputStream( inputStream ) );
+                }
+                else
+                {
+                    slurpEvents( inputStream );
+                }
             }
         } catch (final SironaException me) {
             resp.setStatus(HttpURLConnection.HTTP_BAD_REQUEST);
@@ -212,6 +229,24 @@ public class Collector extends HttpServl
         resp.getWriter().write(OK);
     }
 
+    private byte[] readBytes(ServletInputStream servletInputStream, int length)
+        throws IOException
+    {
+        byte[] bytes = new byte[length];
+        ByteArrayOutputStream buffer = new ByteArrayOutputStream(length);
+
+        int nRead;
+
+        while ((nRead = servletInputStream.read(bytes, 0, bytes.length)) != -1) {
+            buffer.write(bytes, 0, nRead);
+        }
+
+        buffer.flush();
+
+        return buffer.toByteArray();
+
+    }
+
     private void slurpEvents(final InputStream inputStream) throws IOException {
         final Event[] events = mapper.readValue(inputStream, Event[].class);
         if (events != null && events.length > 0) {
@@ -312,6 +347,12 @@ public class Collector extends HttpServl
     }
 
 
+    private void updatePathTracking(final byte[] bytes) {
+
+        pathTrackingDataStore.store( SerializeUtils.deserialize( bytes, PathTrackingEntry.class ) );
+    }
+
+
 
     private void updateCounter(final Event event) {
         final Map<String, Object> data = event.getData();

Modified: incubator/sirona/trunk/server/collector/src/test/java/org/apache/sirona/collector/server/CollectorServer.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/server/collector/src/test/java/org/apache/sirona/collector/server/CollectorServer.java?rev=1574727&r1=1574726&r2=1574727&view=diff
==============================================================================
--- incubator/sirona/trunk/server/collector/src/test/java/org/apache/sirona/collector/server/CollectorServer.java (original)
+++ incubator/sirona/trunk/server/collector/src/test/java/org/apache/sirona/collector/server/CollectorServer.java Thu Mar  6 00:03:18 2014
@@ -241,6 +241,10 @@ public class CollectorServer {
                             return "gzip";
                         }
 
+                        if ("getHeader".equals( method.getName()) && args[0].equals( "Content-Type" )) {
+                            return "foo";
+                        }
+
                         throw new UnsupportedOperationException("not implemented: " + method.getName() + " for args: " + Arrays.asList(args));
                     }
                 })),

Modified: incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/pathtracking/CassandraPathTrackingDataStore.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/pathtracking/CassandraPathTrackingDataStore.java?rev=1574727&r1=1574726&r2=1574727&view=diff
==============================================================================
--- incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/pathtracking/CassandraPathTrackingDataStore.java (original)
+++ incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/pathtracking/CassandraPathTrackingDataStore.java Thu Mar  6 00:03:18 2014
@@ -202,9 +202,11 @@ public class CassandraPathTrackingDataSt
         return entries;
     }
 
-    @Override
-    protected void pushEntriesByBatch( Map<String, Set<PathTrackingEntry>> pathTrackingEntries )
+
+    protected void pushEntriesByBatch( Map<String, List<Pointer>> pathTrackingEntries )
     {
+        // TODO even if not really used
+        /*
         List<PathTrackingEntry> entries = new ArrayList<PathTrackingEntry>(  );
 
         for ( Map.Entry<String, Set<PathTrackingEntry>> entry : pathTrackingEntries.entrySet()) {
@@ -213,6 +215,7 @@ public class CassandraPathTrackingDataSt
 
 
         store( entries );
+        */
     }
 
     protected Keyspace getKeyspace()