You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sirona.apache.org by ol...@apache.org on 2014/02/27 05:08:50 UTC

svn commit: r1572430 - in /incubator/sirona/trunk: agent/store/cube/src/main/java/org/apache/sirona/cube/ core/src/main/java/org/apache/sirona/repositories/ core/src/main/java/org/apache/sirona/store/tracking/ server/store/cassandra/src/main/java/org/a...

Author: olamy
Date: Thu Feb 27 04:08:49 2014
New Revision: 1572430

URL: http://svn.apache.org/r1572430
Log:
start adding batch storing for PathTracking

Added:
    incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java   (contents, props changed)
      - copied, changed from r1572429, incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeDataStoreFactory.java
    incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/BatchPathTrackingDataStore.java   (with props)
Modified:
    incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java
    incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeDataStoreFactory.java
    incubator/sirona/trunk/core/src/main/java/org/apache/sirona/repositories/DefaultRepository.java
    incubator/sirona/trunk/core/src/main/java/org/apache/sirona/repositories/Repository.java
    incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java
    incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/PathTrackingDataStore.java
    incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/pathtracking/CassandraPathTrackingDataStore.java

Modified: incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java?rev=1572430&r1=1572429&r2=1572430&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java (original)
+++ incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java Thu Feb 27 04:08:49 2014
@@ -20,6 +20,7 @@ import org.apache.sirona.Role;
 import org.apache.sirona.counters.Counter;
 import org.apache.sirona.status.NodeStatus;
 import org.apache.sirona.status.ValidationResult;
+import org.apache.sirona.tracking.PathTrackingEntry;
 
 import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.SSLSocketFactory;
@@ -34,9 +35,11 @@ import java.util.Collection;
 import java.util.Date;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 import java.util.TimeZone;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -47,6 +50,7 @@ public class Cube {
     private static final String GAUGE_TYPE = "gauge";
     private static final String VALIDATION_TYPE = "validation";
     private static final String STATUS_TYPE = "status";
+    private static final String PATHTRACKING_TYPE = "pathtracking";
 
     private static final String NAME = "name";
     private static final String ROLE = "role";
@@ -60,6 +64,20 @@ public class Cube {
     private static final String SUM = "sum";
     private static final String M_2 = "m2";
 
+    private static final String TRACKING_D = "trackingId";
+
+    private static final String NODE_ID = "nodeId";
+
+    private static final String CLASSNAME = "className";
+
+    private static final String METHOD_NAME = "methodName";
+
+    private static final String START_TIME = "startTime";
+
+    private static final String EXEC_TIME = "executionTime";
+
+    private static final String LEVEL = "level";
+
     private static final String JSON_BASE = "{" +
         "\"type\": \"%s\"," +
         "\"time\": \"%s\"," +
@@ -230,6 +248,32 @@ public class Cube {
         return events;
     }
 
+    public StringBuilder pathTrackingSnapshot( final ConcurrentMap<String, Set<PathTrackingEntry>> pathTrackingEntries ) {
+        final StringBuilder events = newEventStream();
+        final long ts = System.currentTimeMillis();
+        for (final Map.Entry<String, Set<PathTrackingEntry>> entry : pathTrackingEntries.entrySet() ) {
+
+            for (PathTrackingEntry pathTrackingEntry : entry.getValue()){
+
+                buildEvent( events, PATHTRACKING_TYPE, ts, new MapBuilder()
+                        .add( TRACKING_D, pathTrackingEntry.getTrackingId() )
+                        .add( NODE_ID, pathTrackingEntry.getNodeId() )
+                        .add( CLASSNAME, pathTrackingEntry.getClassName())
+                        .add( METHOD_NAME, pathTrackingEntry.getMethodName())
+                        .add( START_TIME, pathTrackingEntry.getStartTime())
+                        .add( EXEC_TIME, pathTrackingEntry.getExecutionTime())
+                        .add( LEVEL, pathTrackingEntry.getLevel())
+                        .map()
+                );
+
+            }
+
+        }
+
+
+        return events;
+    }
+
     public StringBuilder gaugeSnapshot(final StringBuilder base, final long time, final Role role, final double value) {
         return buildEvent(base, GAUGE_TYPE, time,
             new MapBuilder()
@@ -255,4 +299,6 @@ public class Cube {
         }
         return events;
     }
+
+
 }

Modified: incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeDataStoreFactory.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeDataStoreFactory.java?rev=1572430&r1=1572429&r2=1572430&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeDataStoreFactory.java (original)
+++ incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeDataStoreFactory.java Thu Feb 27 04:08:49 2014
@@ -26,7 +26,6 @@ public class CubeDataStoreFactory extend
             IoCs.processInstance(new CubeCounterDataStore()),
             IoCs.processInstance(new CubeGaugeDataStore()),
             IoCs.processInstance(new CubeNodeStatusDataStore()),
-            // FIXME real implementation in Cube. olamy: not sure it's really supported
-            new InMemoryPathTrackingDataStore());
+            IoCs.processInstance(new CubePathTrackingDataStore()));
     }
 }

Copied: incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java (from r1572429, incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeDataStoreFactory.java)
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java?p2=incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java&p1=incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeDataStoreFactory.java&r1=1572429&r2=1572430&rev=1572430&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeDataStoreFactory.java (original)
+++ incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java Thu Feb 27 04:08:49 2014
@@ -14,19 +14,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.sirona.cube;
 
 import org.apache.sirona.configuration.ioc.IoCs;
-import org.apache.sirona.store.DelegateDataStoreFactory;
-import org.apache.sirona.store.tracking.InMemoryPathTrackingDataStore;
+import org.apache.sirona.store.tracking.BatchPathTrackingDataStore;
+import org.apache.sirona.tracking.PathTrackingEntry;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ *
+ */
+public class CubePathTrackingDataStore
+    extends BatchPathTrackingDataStore
+{
+    private final Cube cube = IoCs.findOrCreateInstance( CubeBuilder.class ).build();
 
-public class CubeDataStoreFactory extends DelegateDataStoreFactory {
-    public CubeDataStoreFactory() {
-        super(
-            IoCs.processInstance(new CubeCounterDataStore()),
-            IoCs.processInstance(new CubeGaugeDataStore()),
-            IoCs.processInstance(new CubeNodeStatusDataStore()),
-            // FIXME real implementation in Cube. olamy: not sure it's really supported
-            new InMemoryPathTrackingDataStore());
+    @Override
+    protected void pushEntriesByBatch( ConcurrentMap<String, Set<PathTrackingEntry>> pathTrackingEntries )
+    {
+        cube.post( cube.pathTrackingSnapshot( pathTrackingEntries ) );
     }
+
+
 }

Modified: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/repositories/DefaultRepository.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/core/src/main/java/org/apache/sirona/repositories/DefaultRepository.java?rev=1572430&r1=1572429&r2=1572430&view=diff
==============================================================================
--- incubator/sirona/trunk/core/src/main/java/org/apache/sirona/repositories/DefaultRepository.java (original)
+++ incubator/sirona/trunk/core/src/main/java/org/apache/sirona/repositories/DefaultRepository.java Thu Feb 27 04:08:49 2014
@@ -227,4 +227,5 @@ public class DefaultRepository implement
     public Map<String, NodeStatus> statuses() {
         return nodeStatusDataStore.statuses();
     }
+
 }

Modified: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/repositories/Repository.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/core/src/main/java/org/apache/sirona/repositories/Repository.java?rev=1572430&r1=1572429&r2=1572430&view=diff
==============================================================================
--- incubator/sirona/trunk/core/src/main/java/org/apache/sirona/repositories/Repository.java (original)
+++ incubator/sirona/trunk/core/src/main/java/org/apache/sirona/repositories/Repository.java Thu Feb 27 04:08:49 2014
@@ -52,4 +52,5 @@ public interface Repository {
     Role findGaugeRole(String name);
 
     Map<String, NodeStatus> statuses();
+
 }

Added: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/BatchPathTrackingDataStore.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/BatchPathTrackingDataStore.java?rev=1572430&view=auto
==============================================================================
--- incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/BatchPathTrackingDataStore.java (added)
+++ incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/BatchPathTrackingDataStore.java Thu Feb 27 04:08:49 2014
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sirona.store.tracking;
+
+import org.apache.sirona.configuration.Configuration;
+import org.apache.sirona.configuration.ioc.Created;
+import org.apache.sirona.store.BatchFuture;
+import org.apache.sirona.tracking.PathTrackingEntry;
+import org.apache.sirona.util.DaemonThreadFactory;
+
+import java.util.Locale;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ *
+ */
+public abstract class BatchPathTrackingDataStore
+    extends InMemoryPathTrackingDataStore
+{
+
+    private static final Logger LOGGER = Logger.getLogger( BatchPathTrackingDataStore.class.getName() );
+
+    protected BatchFuture scheduledTask;
+
+    @Created // call it only when main impl not in delegated mode so use IoC lifecycle management
+    public void initBatch()
+    {
+        final String name =
+            getClass().getSimpleName().toLowerCase( Locale.ENGLISH ).replace( "pathtrackingdatastore", "" );
+
+        final long period = getPeriod( name );
+
+        final ScheduledExecutorService ses =
+            Executors.newSingleThreadScheduledExecutor( new DaemonThreadFactory( name + "-pathtracking-schedule-" ) );
+
+        final ScheduledFuture<?> future =
+            ses.scheduleAtFixedRate( new PushGaugesTask(), period, period, TimeUnit.MILLISECONDS );
+
+        scheduledTask = new BatchFuture( ses, future );
+    }
+
+    protected int getPeriod( final String name )
+    {
+        return Configuration.getInteger( Configuration.CONFIG_PROPERTY_PREFIX + name + ".pathtracking.period", //
+                                         Configuration.getInteger(
+                                             Configuration.CONFIG_PROPERTY_PREFIX + name + ".period", 60000 ) );
+    }
+
+
+    private class PushGaugesTask
+        implements Runnable
+    {
+        @Override
+        public void run()
+        {
+            try
+            {
+                pushEntriesByBatch( getPathTrackingEntries() );
+                clearEntries();
+            }
+            catch ( final Exception e )
+            {
+                LOGGER.log( Level.SEVERE, e.getMessage(), e );
+            }
+        }
+    }
+
+    protected abstract void pushEntriesByBatch( final ConcurrentMap<String, Set<PathTrackingEntry>> pathTrackingEntries );
+
+}

Propchange: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/BatchPathTrackingDataStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/BatchPathTrackingDataStore.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Modified: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java?rev=1572430&r1=1572429&r2=1572430&view=diff
==============================================================================
--- incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java (original)
+++ incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java Thu Feb 27 04:08:49 2014
@@ -92,7 +92,7 @@ public class InMemoryPathTrackingDataSto
     }
 
     @Override
-    public List<String> retrieveTrackingIds( Date startTime, Date endTime )
+    public Collection<String> retrieveTrackingIds( Date startTime, Date endTime )
     {
         List<String> trackingIds = new ArrayList<String>();
         for ( Set<PathTrackingEntry> pathTrackingEntries : this.pathTrackingEntries.values() )
@@ -113,6 +113,13 @@ public class InMemoryPathTrackingDataSto
         return trackingIds;
     }
 
+    @Override
+    public void clearEntries()
+    {
+        pathTrackingEntries =
+            new ConcurrentHashMap<String, Set<PathTrackingEntry>>( 50 );
+    }
+
     protected ConcurrentMap<String, Set<PathTrackingEntry>> getPathTrackingEntries()
     {
         return pathTrackingEntries;

Modified: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/PathTrackingDataStore.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/PathTrackingDataStore.java?rev=1572430&r1=1572429&r2=1572430&view=diff
==============================================================================
--- incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/PathTrackingDataStore.java (original)
+++ incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/PathTrackingDataStore.java Thu Feb 27 04:08:49 2014
@@ -31,6 +31,8 @@ public interface PathTrackingDataStore
 
     void store( Collection<PathTrackingEntry> pathTrackingEntries );
 
+    void clearEntries();
+
     /**
      * the result will be orderer by startTime
      *

Modified: incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/pathtracking/CassandraPathTrackingDataStore.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/pathtracking/CassandraPathTrackingDataStore.java?rev=1572430&r1=1572429&r2=1572430&view=diff
==============================================================================
--- incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/pathtracking/CassandraPathTrackingDataStore.java (original)
+++ incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/pathtracking/CassandraPathTrackingDataStore.java Thu Feb 27 04:08:49 2014
@@ -30,16 +30,20 @@ import me.prettyprint.hector.api.query.Q
 import org.apache.sirona.cassandra.DynamicDelegatedSerializer;
 import org.apache.sirona.cassandra.collector.CassandraSirona;
 import org.apache.sirona.configuration.ioc.IoCs;
+import org.apache.sirona.store.tracking.BatchPathTrackingDataStore;
 import org.apache.sirona.store.tracking.PathTrackingDataStore;
 import org.apache.sirona.tracking.PathTrackingEntry;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentMap;
 
 import static org.apache.sirona.cassandra.collector.CassandraSirona.*;
 
@@ -47,6 +51,7 @@ import static org.apache.sirona.cassandr
  *
  */
 public class CassandraPathTrackingDataStore
+    extends BatchPathTrackingDataStore
     implements PathTrackingDataStore
 {
 
@@ -77,7 +82,7 @@ public class CassandraPathTrackingDataSt
     public void store( Collection<PathTrackingEntry> pathTrackingEntries )
     {
 
-        final Mutator<String> mutator = HFactory.createMutator( keyspace, StringSerializer.get() );
+        // FIXME find a more efficient way to store such batch of datas
 
         for ( PathTrackingEntry pathTrackingEntry : pathTrackingEntries )
 
@@ -183,6 +188,19 @@ public class CassandraPathTrackingDataSt
         return entries;
     }
 
+    @Override
+    protected void pushEntriesByBatch( ConcurrentMap<String, Set<PathTrackingEntry>> pathTrackingEntries )
+    {
+        List<PathTrackingEntry> entries = new ArrayList<PathTrackingEntry>(  );
+
+        for ( Map.Entry<String, Set<PathTrackingEntry>> entry : pathTrackingEntries.entrySet()) {
+            entries.addAll( entry.getValue() );
+        }
+
+
+        store( entries );
+    }
+
     protected Keyspace getKeyspace()
     {
         return keyspace;