You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sirona.apache.org by ol...@apache.org on 2014/02/27 05:08:50 UTC
svn commit: r1572430 - in /incubator/sirona/trunk:
agent/store/cube/src/main/java/org/apache/sirona/cube/
core/src/main/java/org/apache/sirona/repositories/
core/src/main/java/org/apache/sirona/store/tracking/
server/store/cassandra/src/main/java/org/a...
Author: olamy
Date: Thu Feb 27 04:08:49 2014
New Revision: 1572430
URL: http://svn.apache.org/r1572430
Log:
start adding batch storing for PathTracking
Added:
incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java (contents, props changed)
- copied, changed from r1572429, incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeDataStoreFactory.java
incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/BatchPathTrackingDataStore.java (with props)
Modified:
incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java
incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeDataStoreFactory.java
incubator/sirona/trunk/core/src/main/java/org/apache/sirona/repositories/DefaultRepository.java
incubator/sirona/trunk/core/src/main/java/org/apache/sirona/repositories/Repository.java
incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java
incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/PathTrackingDataStore.java
incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/pathtracking/CassandraPathTrackingDataStore.java
Modified: incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java?rev=1572430&r1=1572429&r2=1572430&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java (original)
+++ incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/Cube.java Thu Feb 27 04:08:49 2014
@@ -20,6 +20,7 @@ import org.apache.sirona.Role;
import org.apache.sirona.counters.Counter;
import org.apache.sirona.status.NodeStatus;
import org.apache.sirona.status.ValidationResult;
+import org.apache.sirona.tracking.PathTrackingEntry;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;
@@ -34,9 +35,11 @@ import java.util.Collection;
import java.util.Date;
import java.util.Locale;
import java.util.Map;
+import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -47,6 +50,7 @@ public class Cube {
private static final String GAUGE_TYPE = "gauge";
private static final String VALIDATION_TYPE = "validation";
private static final String STATUS_TYPE = "status";
+ private static final String PATHTRACKING_TYPE = "pathtracking";
private static final String NAME = "name";
private static final String ROLE = "role";
@@ -60,6 +64,20 @@ public class Cube {
private static final String SUM = "sum";
private static final String M_2 = "m2";
+ private static final String TRACKING_D = "trackingId";
+
+ private static final String NODE_ID = "nodeId";
+
+ private static final String CLASSNAME = "className";
+
+ private static final String METHOD_NAME = "methodName";
+
+ private static final String START_TIME = "startTime";
+
+ private static final String EXEC_TIME = "executionTime";
+
+ private static final String LEVEL = "level";
+
private static final String JSON_BASE = "{" +
"\"type\": \"%s\"," +
"\"time\": \"%s\"," +
@@ -230,6 +248,32 @@ public class Cube {
return events;
}
+ public StringBuilder pathTrackingSnapshot( final ConcurrentMap<String, Set<PathTrackingEntry>> pathTrackingEntries ) {
+ final StringBuilder events = newEventStream();
+ final long ts = System.currentTimeMillis();
+ for (final Map.Entry<String, Set<PathTrackingEntry>> entry : pathTrackingEntries.entrySet() ) {
+
+ for (PathTrackingEntry pathTrackingEntry : entry.getValue()){
+
+ buildEvent( events, PATHTRACKING_TYPE, ts, new MapBuilder()
+ .add( TRACKING_D, pathTrackingEntry.getTrackingId() )
+ .add( NODE_ID, pathTrackingEntry.getNodeId() )
+ .add( CLASSNAME, pathTrackingEntry.getClassName())
+ .add( METHOD_NAME, pathTrackingEntry.getMethodName())
+ .add( START_TIME, pathTrackingEntry.getStartTime())
+ .add( EXEC_TIME, pathTrackingEntry.getExecutionTime())
+ .add( LEVEL, pathTrackingEntry.getLevel())
+ .map()
+ );
+
+ }
+
+ }
+
+
+ return events;
+ }
+
public StringBuilder gaugeSnapshot(final StringBuilder base, final long time, final Role role, final double value) {
return buildEvent(base, GAUGE_TYPE, time,
new MapBuilder()
@@ -255,4 +299,6 @@ public class Cube {
}
return events;
}
+
+
}
Modified: incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeDataStoreFactory.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeDataStoreFactory.java?rev=1572430&r1=1572429&r2=1572430&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeDataStoreFactory.java (original)
+++ incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeDataStoreFactory.java Thu Feb 27 04:08:49 2014
@@ -26,7 +26,6 @@ public class CubeDataStoreFactory extend
IoCs.processInstance(new CubeCounterDataStore()),
IoCs.processInstance(new CubeGaugeDataStore()),
IoCs.processInstance(new CubeNodeStatusDataStore()),
- // FIXME real implementation in Cube. olamy: not sure it's really supported
- new InMemoryPathTrackingDataStore());
+ IoCs.processInstance(new CubePathTrackingDataStore()));
}
}
Copied: incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java (from r1572429, incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeDataStoreFactory.java)
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java?p2=incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java&p1=incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeDataStoreFactory.java&r1=1572429&r2=1572430&rev=1572430&view=diff
==============================================================================
--- incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubeDataStoreFactory.java (original)
+++ incubator/sirona/trunk/agent/store/cube/src/main/java/org/apache/sirona/cube/CubePathTrackingDataStore.java Thu Feb 27 04:08:49 2014
@@ -14,19 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.sirona.cube;
import org.apache.sirona.configuration.ioc.IoCs;
-import org.apache.sirona.store.DelegateDataStoreFactory;
-import org.apache.sirona.store.tracking.InMemoryPathTrackingDataStore;
+import org.apache.sirona.store.tracking.BatchPathTrackingDataStore;
+import org.apache.sirona.tracking.PathTrackingEntry;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ *
+ */
+public class CubePathTrackingDataStore
+ extends BatchPathTrackingDataStore
+{
+ private final Cube cube = IoCs.findOrCreateInstance( CubeBuilder.class ).build();
-public class CubeDataStoreFactory extends DelegateDataStoreFactory {
- public CubeDataStoreFactory() {
- super(
- IoCs.processInstance(new CubeCounterDataStore()),
- IoCs.processInstance(new CubeGaugeDataStore()),
- IoCs.processInstance(new CubeNodeStatusDataStore()),
- // FIXME real implementation in Cube. olamy: not sure it's really supported
- new InMemoryPathTrackingDataStore());
+ @Override
+ protected void pushEntriesByBatch( ConcurrentMap<String, Set<PathTrackingEntry>> pathTrackingEntries )
+ {
+ cube.post( cube.pathTrackingSnapshot( pathTrackingEntries ) );
}
+
+
}
Modified: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/repositories/DefaultRepository.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/core/src/main/java/org/apache/sirona/repositories/DefaultRepository.java?rev=1572430&r1=1572429&r2=1572430&view=diff
==============================================================================
--- incubator/sirona/trunk/core/src/main/java/org/apache/sirona/repositories/DefaultRepository.java (original)
+++ incubator/sirona/trunk/core/src/main/java/org/apache/sirona/repositories/DefaultRepository.java Thu Feb 27 04:08:49 2014
@@ -227,4 +227,5 @@ public class DefaultRepository implement
public Map<String, NodeStatus> statuses() {
return nodeStatusDataStore.statuses();
}
+
}
Modified: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/repositories/Repository.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/core/src/main/java/org/apache/sirona/repositories/Repository.java?rev=1572430&r1=1572429&r2=1572430&view=diff
==============================================================================
--- incubator/sirona/trunk/core/src/main/java/org/apache/sirona/repositories/Repository.java (original)
+++ incubator/sirona/trunk/core/src/main/java/org/apache/sirona/repositories/Repository.java Thu Feb 27 04:08:49 2014
@@ -52,4 +52,5 @@ public interface Repository {
Role findGaugeRole(String name);
Map<String, NodeStatus> statuses();
+
}
Added: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/BatchPathTrackingDataStore.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/BatchPathTrackingDataStore.java?rev=1572430&view=auto
==============================================================================
--- incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/BatchPathTrackingDataStore.java (added)
+++ incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/BatchPathTrackingDataStore.java Thu Feb 27 04:08:49 2014
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sirona.store.tracking;
+
+import org.apache.sirona.configuration.Configuration;
+import org.apache.sirona.configuration.ioc.Created;
+import org.apache.sirona.store.BatchFuture;
+import org.apache.sirona.tracking.PathTrackingEntry;
+import org.apache.sirona.util.DaemonThreadFactory;
+
+import java.util.Locale;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ *
+ */
+public abstract class BatchPathTrackingDataStore
+ extends InMemoryPathTrackingDataStore
+{
+
+ private static final Logger LOGGER = Logger.getLogger( BatchPathTrackingDataStore.class.getName() );
+
+ protected BatchFuture scheduledTask;
+
+ @Created // call it only when main impl not in delegated mode so use IoC lifecycle management
+ public void initBatch()
+ {
+ final String name =
+ getClass().getSimpleName().toLowerCase( Locale.ENGLISH ).replace( "pathtrackingdatastore", "" );
+
+ final long period = getPeriod( name );
+
+ final ScheduledExecutorService ses =
+ Executors.newSingleThreadScheduledExecutor( new DaemonThreadFactory( name + "-pathtracking-schedule-" ) );
+
+ final ScheduledFuture<?> future =
+ ses.scheduleAtFixedRate( new PushGaugesTask(), period, period, TimeUnit.MILLISECONDS );
+
+ scheduledTask = new BatchFuture( ses, future );
+ }
+
+ protected int getPeriod( final String name )
+ {
+ return Configuration.getInteger( Configuration.CONFIG_PROPERTY_PREFIX + name + ".pathtracking.period", //
+ Configuration.getInteger(
+ Configuration.CONFIG_PROPERTY_PREFIX + name + ".period", 60000 ) );
+ }
+
+
+ private class PushGaugesTask
+ implements Runnable
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ pushEntriesByBatch( getPathTrackingEntries() );
+ clearEntries();
+ }
+ catch ( final Exception e )
+ {
+ LOGGER.log( Level.SEVERE, e.getMessage(), e );
+ }
+ }
+ }
+
+ protected abstract void pushEntriesByBatch( final ConcurrentMap<String, Set<PathTrackingEntry>> pathTrackingEntries );
+
+}
Propchange: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/BatchPathTrackingDataStore.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/BatchPathTrackingDataStore.java
------------------------------------------------------------------------------
svn:keywords = Author Date Id Revision
Modified: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java?rev=1572430&r1=1572429&r2=1572430&view=diff
==============================================================================
--- incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java (original)
+++ incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/InMemoryPathTrackingDataStore.java Thu Feb 27 04:08:49 2014
@@ -92,7 +92,7 @@ public class InMemoryPathTrackingDataSto
}
@Override
- public List<String> retrieveTrackingIds( Date startTime, Date endTime )
+ public Collection<String> retrieveTrackingIds( Date startTime, Date endTime )
{
List<String> trackingIds = new ArrayList<String>();
for ( Set<PathTrackingEntry> pathTrackingEntries : this.pathTrackingEntries.values() )
@@ -113,6 +113,13 @@ public class InMemoryPathTrackingDataSto
return trackingIds;
}
+ @Override
+ public void clearEntries()
+ {
+ pathTrackingEntries =
+ new ConcurrentHashMap<String, Set<PathTrackingEntry>>( 50 );
+ }
+
protected ConcurrentMap<String, Set<PathTrackingEntry>> getPathTrackingEntries()
{
return pathTrackingEntries;
Modified: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/PathTrackingDataStore.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/PathTrackingDataStore.java?rev=1572430&r1=1572429&r2=1572430&view=diff
==============================================================================
--- incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/PathTrackingDataStore.java (original)
+++ incubator/sirona/trunk/core/src/main/java/org/apache/sirona/store/tracking/PathTrackingDataStore.java Thu Feb 27 04:08:49 2014
@@ -31,6 +31,8 @@ public interface PathTrackingDataStore
void store( Collection<PathTrackingEntry> pathTrackingEntries );
+ void clearEntries();
+
/**
* the result will be orderer by startTime
*
Modified: incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/pathtracking/CassandraPathTrackingDataStore.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/pathtracking/CassandraPathTrackingDataStore.java?rev=1572430&r1=1572429&r2=1572430&view=diff
==============================================================================
--- incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/pathtracking/CassandraPathTrackingDataStore.java (original)
+++ incubator/sirona/trunk/server/store/cassandra/src/main/java/org/apache/sirona/cassandra/pathtracking/CassandraPathTrackingDataStore.java Thu Feb 27 04:08:49 2014
@@ -30,16 +30,20 @@ import me.prettyprint.hector.api.query.Q
import org.apache.sirona.cassandra.DynamicDelegatedSerializer;
import org.apache.sirona.cassandra.collector.CassandraSirona;
import org.apache.sirona.configuration.ioc.IoCs;
+import org.apache.sirona.store.tracking.BatchPathTrackingDataStore;
import org.apache.sirona.store.tracking.PathTrackingDataStore;
import org.apache.sirona.tracking.PathTrackingEntry;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentMap;
import static org.apache.sirona.cassandra.collector.CassandraSirona.*;
@@ -47,6 +51,7 @@ import static org.apache.sirona.cassandr
*
*/
public class CassandraPathTrackingDataStore
+ extends BatchPathTrackingDataStore
implements PathTrackingDataStore
{
@@ -77,7 +82,7 @@ public class CassandraPathTrackingDataSt
public void store( Collection<PathTrackingEntry> pathTrackingEntries )
{
- final Mutator<String> mutator = HFactory.createMutator( keyspace, StringSerializer.get() );
+ // FIXME find a more efficient way to store such batch of datas
for ( PathTrackingEntry pathTrackingEntry : pathTrackingEntries )
@@ -183,6 +188,19 @@ public class CassandraPathTrackingDataSt
return entries;
}
+ @Override
+ protected void pushEntriesByBatch( ConcurrentMap<String, Set<PathTrackingEntry>> pathTrackingEntries )
+ {
+ List<PathTrackingEntry> entries = new ArrayList<PathTrackingEntry>( );
+
+ for ( Map.Entry<String, Set<PathTrackingEntry>> entry : pathTrackingEntries.entrySet()) {
+ entries.addAll( entry.getValue() );
+ }
+
+
+ store( entries );
+ }
+
protected Keyspace getKeyspace()
{
return keyspace;