You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sirona.apache.org by ol...@apache.org on 2014/03/03 00:02:17 UTC

svn commit: r1573398 - in /incubator/sirona/trunk: ./ agent/store/cube/ agent/store/cube/src/main/java/org/apache/sirona/cube/ agent/store/cube/src/test/java/org/apache/sirona/cube/ server/collector/ server/collector/src/main/java/org/apache/sirona/col...

Author: olamy
Date: Sun Mar  2 23:02:16 2014
New Revision: 1573398

URL: http://svn.apache.org/r1573398
Log:
use gzip compression to transport data

Added:
    incubator/sirona/trunk/server/collector/src/test/sirona.properties   (with props)
Modified:
    incubator/sirona/trunk/agent/store/cube/pom.xml
    incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java
    incubator/sirona/trunk/agent/store/cube/src/test/java/org/apache/sirona/cube/CubeServer.java
    incubator/sirona/trunk/pom.xml
    incubator/sirona/trunk/server/collector/pom.xml
    incubator/sirona/trunk/server/collector/src/main/java/org/apache/sirona/collector/server/Collector.java
    incubator/sirona/trunk/server/collector/src/test/java/org/apache/sirona/collector/server/CollectorServer.java

Modified: incubator/sirona/trunk/agent/store/cube/pom.xml
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/store/cube/pom.xml?rev=1573398&r1=1573397&r2=1573398&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/store/cube/pom.xml (original)
+++ incubator/sirona/trunk/agent/store/cube/pom.xml Sun Mar  2 23:02:16 2014
@@ -42,6 +42,11 @@
       <groupId>io.netty</groupId>
       <artifactId>netty-codec-http</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.jcraft</groupId>
+      <artifactId>jzlib</artifactId>
+    </dependency>
+
   </dependencies>
 
   <build>

Modified: incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java?rev=1573398&r1=1573397&r2=1573398&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java (original)
+++ incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java Sun Mar  2 23:02:16 2014
@@ -24,6 +24,10 @@ import org.apache.sirona.tracking.PathTr
 
 import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.SSLSocketFactory;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
@@ -35,13 +39,12 @@ import java.util.Collection;
 import java.util.Date;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Set;
 import java.util.TimeZone;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.zip.GZIPOutputStream;
 
 public class Cube {
     private static final Logger LOGGER = Logger.getLogger(Cube.class.getName());
@@ -82,6 +85,8 @@ public class Cube {
     private static final String CONTENT_TYPE = "Content-Type";
     private static final String APPLICATION_JSON = "application/json";
     private static final String CONTENT_LENGTH = "Content-Length";
+    private static final String GZIP_CONTENT_ENCODING = "gzip";
+    private static final String CONTENT_ENCODING = "Content-Encoding";
 
     private static final String JS_ISO_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";
     private static final String UTC = "UTC";
@@ -91,6 +96,8 @@ public class Cube {
 
     private final BlockingQueue<DateFormat> isoDateFormatters;
 
+
+
     public Cube(final CubeBuilder cubeBuilder) {
         config = cubeBuilder;
         if (config.getProxyHost() != null) {
@@ -139,9 +146,12 @@ public class Cube {
                 connection.setRequestProperty("Authorization", auth);
             }
 
+            byte[] bytes = gzipCompression( payload.getBytes() );
+
             connection.setRequestMethod(POST);
             connection.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON);
-            connection.setRequestProperty(CONTENT_LENGTH, Long.toString(payload.length()));
+            connection.setRequestProperty( CONTENT_ENCODING, GZIP_CONTENT_ENCODING );
+            connection.setRequestProperty(CONTENT_LENGTH, Long.toString(bytes.length));
             connection.setUseCaches(false);
             connection.setDoInput(true);
             connection.setDoOutput(true);
@@ -149,7 +159,8 @@ public class Cube {
             try {
                 final OutputStream output = connection.getOutputStream();
                 try {
-                    output.write(payload.getBytes());
+                    // FIXME find a more efficient way to prevent to have all of this in memory
+                    output.write( bytes );
                     output.flush();
 
                     final int status = connection.getResponseCode();
@@ -171,6 +182,33 @@ public class Cube {
         }
     }
 
+    private static byte[] gzipCompression( byte[] unCompress )
+        throws IOException
+    {
+        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+        GZIPOutputStream out = new GZIPOutputStream( buffer );
+        out.write( unCompress );
+        out.finish();
+        ByteArrayInputStream bais = new ByteArrayInputStream( buffer.toByteArray() );
+        byte[] res = toByteArray( bais );
+        return res;
+   }
+
+    public static byte[] toByteArray( InputStream input )
+        throws IOException
+    {
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        byte[] buffer =  new byte[4096];
+
+        int n = 0;
+        while ( -1 != ( n = input.read( buffer ) ) )
+        {
+            output.write( buffer, 0, n );
+        }
+
+        return output.toByteArray();
+    }
+
     public static String finalPayload(final String events) {
         return '[' + events + ']';
     }

Modified: incubator/sirona/trunk/agent/store/cube/src/test/java/org/apache/sirona/cube/CubeServer.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/store/cube/src/test/java/org/apache/sirona/cube/CubeServer.java?rev=1573398&r1=1573397&r2=1573398&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/store/cube/src/test/java/org/apache/sirona/cube/CubeServer.java (original)
+++ incubator/sirona/trunk/agent/store/cube/src/test/java/org/apache/sirona/cube/CubeServer.java Sun Mar  2 23:02:16 2014
@@ -27,8 +27,10 @@ import io.netty.channel.SimpleChannelInb
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.compression.JdkZlibDecoder;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
 import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpContentDecompressor;
 import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.HttpRequestDecoder;
@@ -143,10 +145,11 @@ public class CubeServer {
 
             pipeline
                 .addLast("decoder", new HttpRequestDecoder())
+                .addLast("inflater", new HttpContentDecompressor())
                 .addLast("aggregator", new HttpObjectAggregator(Integer.MAX_VALUE))
                 .addLast("encoder", new HttpResponseEncoder())
                 .addLast("chunked-writer", new ChunkedWriteHandler())
-                .addLast("featured-mock-server", new RequestHandler(messages));
+                .addLast( "featured-mock-server", new RequestHandler( messages ) );
         }
     }
 
@@ -161,8 +164,9 @@ public class CubeServer {
         protected void channelRead0(final ChannelHandlerContext ctx, final FullHttpRequest fullHttpRequest) throws Exception {
             final ChannelFuture future;
             if (HttpMethod.POST.equals(fullHttpRequest.getMethod())) {
+                String message = fullHttpRequest.content().toString(Charset.defaultCharset());
                 synchronized (messages) {
-                    messages.add(fullHttpRequest.content().toString(Charset.defaultCharset()));
+                    messages.add(message);
                 }
                 final HttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
                 future = ctx.writeAndFlush(response);
@@ -170,7 +174,11 @@ public class CubeServer {
                 LOGGER.warning("Received " + fullHttpRequest.getMethod());
                 future = ctx.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR));
             }
+
             future.addListener(ChannelFutureListener.CLOSE);
         }
     }
+
+
+
 }
\ No newline at end of file

Modified: incubator/sirona/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/pom.xml?rev=1573398&r1=1573397&r2=1573398&view=diff
==============================================================================
--- incubator/sirona/trunk/pom.xml (original)
+++ incubator/sirona/trunk/pom.xml Sun Mar  2 23:02:16 2014
@@ -401,6 +401,12 @@
       </dependency>
 
       <dependency>
+        <groupId>commons-io</groupId>
+        <artifactId>commons-io</artifactId>
+        <version>2.4</version>
+      </dependency>
+
+      <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty-codec-http</artifactId>
         <version>4.0.16.Final</version>
@@ -408,6 +414,13 @@
       </dependency>
 
       <dependency>
+        <groupId>com.jcraft</groupId>
+        <artifactId>jzlib</artifactId>
+        <version>1.1.3</version>
+        <scope>test</scope>
+      </dependency>
+
+      <dependency>
         <groupId>org.jboss.shrinkwrap.descriptors</groupId>
         <artifactId>shrinkwrap-descriptors-impl-javaee</artifactId>
         <version>2.0.0-alpha-5</version>

Modified: incubator/sirona/trunk/server/collector/pom.xml
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/server/collector/pom.xml?rev=1573398&r1=1573397&r2=1573398&view=diff
==============================================================================
--- incubator/sirona/trunk/server/collector/pom.xml (original)
+++ incubator/sirona/trunk/server/collector/pom.xml Sun Mar  2 23:02:16 2014
@@ -70,6 +70,10 @@
       <groupId>io.netty</groupId>
       <artifactId>netty-codec-http</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.jcraft</groupId>
+      <artifactId>jzlib</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
@@ -99,7 +103,7 @@
           <port>${tomcatRunPort}</port>
           <path>${tomcatRunPath}</path>
           <systemProperties>
-            <org.apache.sirona.configuration.sirona.properties>${basedir}/sirona.properties</org.apache.sirona.configuration.sirona.properties>
+            <org.apache.sirona.configuration.sirona.properties>${basedir}/src/test/sirona.properties</org.apache.sirona.configuration.sirona.properties>
           </systemProperties>
         </configuration>
         <dependencies>

Modified: incubator/sirona/trunk/server/collector/src/main/java/org/apache/sirona/collector/server/Collector.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/server/collector/src/main/java/org/apache/sirona/collector/server/Collector.java?rev=1573398&r1=1573397&r2=1573398&view=diff
==============================================================================
--- incubator/sirona/trunk/server/collector/src/main/java/org/apache/sirona/collector/server/Collector.java (original)
+++ incubator/sirona/trunk/server/collector/src/main/java/org/apache/sirona/collector/server/Collector.java Sun Mar  2 23:02:16 2014
@@ -68,6 +68,7 @@ import java.util.concurrent.ScheduledFut
 import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.zip.GZIPInputStream;
 
 // should work with cube clients, see cube module for details
 // Note: for this simple need we don't need JAXRS
@@ -82,6 +83,8 @@ public class Collector extends HttpServl
     private static final String REGISTRATION = "registration";
     private static final String PATH_TRACKING = "pathtracking";
 
+    private static final String CONTENT_ENCODING = "Content-Encoding";
+
     private static final String GET = "GET";
 
     private final Map<String, Role> roles = new ConcurrentHashMap<String, Role>();
@@ -193,7 +196,12 @@ public class Collector extends HttpServl
 
         final ServletInputStream inputStream = req.getInputStream();
         try {
-            slurpEvents(inputStream);
+            if ("gzip".equals( req.getHeader( CONTENT_ENCODING ) ))
+            {
+                slurpEvents(new GZIPInputStream( inputStream ));
+            } else {
+                slurpEvents(inputStream);
+            }
         } catch (final SironaException me) {
             resp.setStatus(HttpURLConnection.HTTP_BAD_REQUEST);
             resp.getWriter().write("{\"error\":\"" + me.getCause().getMessage().replace('\"', ' ') + "\"}");

Modified: incubator/sirona/trunk/server/collector/src/test/java/org/apache/sirona/collector/server/CollectorServer.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/server/collector/src/test/java/org/apache/sirona/collector/server/CollectorServer.java?rev=1573398&r1=1573397&r2=1573398&view=diff
==============================================================================
--- incubator/sirona/trunk/server/collector/src/test/java/org/apache/sirona/collector/server/CollectorServer.java (original)
+++ incubator/sirona/trunk/server/collector/src/test/java/org/apache/sirona/collector/server/CollectorServer.java Sun Mar  2 23:02:16 2014
@@ -29,6 +29,7 @@ import io.netty.channel.socket.SocketCha
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
 import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpContentDecompressor;
 import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.HttpRequestDecoder;
@@ -53,9 +54,11 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.net.ServerSocket;
 import java.nio.charset.Charset;
+import java.util.Arrays;
 import java.util.Enumeration;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.zip.GZIPOutputStream;
 
 public class CollectorServer {
     private static final Logger LOGGER = Logger.getLogger(CollectorServer.class.getName());
@@ -139,7 +142,8 @@ public class CollectorServer {
 
             pipeline
                 .addLast("decoder", new HttpRequestDecoder())
-                .addLast("aggregator", new HttpObjectAggregator(Integer.MAX_VALUE))
+                .addLast("inflater", new HttpContentDecompressor())
+                .addLast("aggregator", new HttpObjectAggregator( Integer.MAX_VALUE ) )
                 .addLast("encoder", new HttpResponseEncoder())
                 .addLast("chunked-writer", new ChunkedWriteHandler())
                 .addLast("featured-mock-server", new RequestHandler());
@@ -178,11 +182,40 @@ public class CollectorServer {
             }
         }
 
+
+        protected static byte[] gzipCompression( byte[] unCompress )
+            throws IOException
+        {
+            ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+            GZIPOutputStream out = new GZIPOutputStream( buffer );
+            out.write( unCompress );
+            out.finish();
+            ByteArrayInputStream bais = new ByteArrayInputStream( buffer.toByteArray() );
+            byte[] res = toByteArray( bais );
+            return res;
+        }
+
+        public static byte[] toByteArray( InputStream input )
+            throws IOException
+        {
+            ByteArrayOutputStream output = new ByteArrayOutputStream();
+            byte[] buffer =  new byte[4096];
+
+            int n = 0;
+            while ( -1 != ( n = input.read( buffer ) ) )
+            {
+                output.write( buffer, 0, n );
+            }
+
+            return output.toByteArray();
+        }
+
         @Override
         protected void channelRead0(final ChannelHandlerContext ctx, final FullHttpRequest fullHttpRequest) throws Exception {
             final ChannelFuture future;
             if (HttpMethod.POST.equals(fullHttpRequest.getMethod())) {
-                final InputStream is = new ByteArrayInputStream(fullHttpRequest.content().toString(Charset.defaultCharset()).getBytes());
+                final InputStream is =
+                    new ByteArrayInputStream(gzipCompression(fullHttpRequest.content().toString(Charset.defaultCharset()).getBytes()));
 
                 final DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
                 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -204,7 +237,11 @@ public class CollectorServer {
                             };
                         }
 
-                        throw new UnsupportedOperationException("not implemented");
+                        if ("getHeader".equals( method.getName()) && args[0].equals( "Content-Encoding" )) {
+                            return "gzip";
+                        }
+
+                        throw new UnsupportedOperationException("not implemented: " + method.getName() + " for args: " + Arrays.asList(args));
                     }
                 })),
                 HttpServletResponse.class.cast(Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[] { HttpServletResponse.class}, new InvocationHandler() {

Added: incubator/sirona/trunk/server/collector/src/test/sirona.properties
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/server/collector/src/test/sirona.properties?rev=1573398&view=auto
==============================================================================
--- incubator/sirona/trunk/server/collector/src/test/sirona.properties (added)
+++ incubator/sirona/trunk/server/collector/src/test/sirona.properties Sun Mar  2 23:02:16 2014
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+org.apache.sirona.cassandra.CassandraBuilder.keyspace = SironaKeySpace
+org.apache.sirona.cassandra.CassandraBuilder.hosts = localhost:9160
+org.apache.sirona.store.DataStoreFactory = org.apache.sirona.cassandra.CassandraAgentDataStoreFactory
+org.apache.sirona.store.tracking.CollectorPathTrackingDataStore = org.apache.sirona.cassandra.pathtracking.CassandraPathTrackingDataStore
+
+
+org.apache.sirona.store.tracking.PathTrackingDataStore = org.apache.sirona.cassandra.pathtracking.CassandraPathTrackingDataStore
+org.apache.sirona.store.gauge.CollectorGaugeDataStore=org.apache.sirona.cassandra.collector.gauge.CassandraCollectorGaugeDataStore
+

Propchange: incubator/sirona/trunk/server/collector/src/test/sirona.properties
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/sirona/trunk/server/collector/src/test/sirona.properties
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision