You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by cs...@apache.org on 2016/08/08 16:47:18 UTC

[1/2] karaf-decanter git commit: Improved cassandra appender

Repository: karaf-decanter
Updated Branches:
  refs/heads/master f3f93755b -> b9438dc3e


Improved cassandra appender


Project: http://git-wip-us.apache.org/repos/asf/karaf-decanter/repo
Commit: http://git-wip-us.apache.org/repos/asf/karaf-decanter/commit/72db51a4
Tree: http://git-wip-us.apache.org/repos/asf/karaf-decanter/tree/72db51a4
Diff: http://git-wip-us.apache.org/repos/asf/karaf-decanter/diff/72db51a4

Branch: refs/heads/master
Commit: 72db51a498a9a9208c1ffb35eaa509f7ecef461a
Parents: f3f9375
Author: Christian Schneider <ch...@die-schneider.net>
Authored: Mon Aug 8 18:46:38 2016 +0200
Committer: Christian Schneider <ch...@die-schneider.net>
Committed: Mon Aug 8 18:46:38 2016 +0200

----------------------------------------------------------------------
 appender/cassandra/pom.xml                      |  13 ++-
 .../appender/cassandra/CassandraAppender.java   | 107 +++++++++----------
 .../cassandra/CassandraAppenderTest.java        |  16 ++-
 3 files changed, 70 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/72db51a4/appender/cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/appender/cassandra/pom.xml b/appender/cassandra/pom.xml
index 68ea240..861bae1 100644
--- a/appender/cassandra/pom.xml
+++ b/appender/cassandra/pom.xml
@@ -33,13 +33,10 @@
 
     <dependencies>
 
-        <!-- Decanter API -->
         <dependency>
             <groupId>org.apache.karaf.decanter</groupId>
             <artifactId>org.apache.karaf.decanter.api</artifactId>
         </dependency>
-
-        <!-- SLF4J -->
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
@@ -64,8 +61,14 @@
 				</exclusion>
 			</exclusions>
 		</dependency>
-		
-		<dependency>
+
+        <dependency>
+        	<groupId>org.slf4j</groupId>
+        	<artifactId>slf4j-jdk14</artifactId>
+        	<version>1.7.21</version>
+        	<scope>test</scope>
+        </dependency>
+        <dependency>
 			<groupId>org.apache.cassandra</groupId>
 			<artifactId>cassandra-all</artifactId>
 			<version>${cassandra.version}</version>

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/72db51a4/appender/cassandra/src/main/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppender.java
----------------------------------------------------------------------
diff --git a/appender/cassandra/src/main/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppender.java b/appender/cassandra/src/main/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppender.java
index 6175f87..e3e7d17 100644
--- a/appender/cassandra/src/main/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppender.java
+++ b/appender/cassandra/src/main/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppender.java
@@ -23,6 +23,7 @@ import org.apache.karaf.decanter.api.marshaller.Marshaller;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventConstants;
@@ -46,40 +47,41 @@ public class CassandraAppender implements EventHandler {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(CassandraAppender.class);
 
-    private String cassandraHost;
-    private Integer cassandraPort;
-    private String cassandraUser;
-    private String cassandraPassword;
     private String keyspace;
     private String tableName;
     private Marshaller marshaller;
 
-    private final static String createTableTemplate = "CREATE TABLE IF NOT EXISTS TABLENAME (timeStamp timestamp PRIMARY KEY, content Text);";
+    private final static String createTableTemplate = "CREATE TABLE IF NOT EXISTS %s (timeStamp timestamp PRIMARY KEY, content Text);";
 
-    private final static String upsertQueryTemplate = "INSERT INTO TABLENAME(timeStamp, content) VALUES(?,?);";
+    private final static String insertTemplate = "INSERT INTO %s (timeStamp, content) VALUES(?,?);";
+
+    private Cluster cluster;
 
     public CassandraAppender() {
     }
     
-    public CassandraAppender(Marshaller marshaller, String keyspace, String tableName, String cassandraHost,
-            Integer cassandraPort, String cassandraUser, String cassandraPassword) {
-        this.marshaller = marshaller;
-        this.keyspace = keyspace;
-        this.tableName = tableName;
-        this.cassandraHost = cassandraHost;
-        this.cassandraPort = cassandraPort;
-        this.cassandraUser = cassandraUser;
-        this.cassandraPassword = cassandraPassword;
-    }
-    
     @SuppressWarnings("unchecked")
     @Activate
     public void activate(ComponentContext context) {
         Dictionary<String, Object> config = context.getProperties();
+        activate(config);
+    }
+
+    void activate(Dictionary<String, Object> config) {
         this.keyspace = getValue(config, "keyspace.name", "decanter");
         this.tableName = getValue(config, "table.name", "decanter");
-        this.cassandraHost = getValue(config, "cassandra.host", "localhost");
-        this.cassandraPort = Integer.parseInt(getValue(config, "cassandra.port", "9042"));
+        String host = getValue(config, "cassandra.host", "localhost");
+        Integer port = Integer.parseInt(getValue(config, "cassandra.port", "9042"));
+        Builder clusterBuilder = Cluster.builder().addContactPoint(host);
+        if (port != null) {
+            clusterBuilder.withPort(port);
+        }
+        cluster = clusterBuilder.build();
+    }
+    
+    @Deactivate
+    public void deactivate() {
+        cluster.close();
     }
     
     private String getValue(Dictionary<String, Object> config, String key, String defaultValue) {
@@ -90,40 +92,16 @@ public class CassandraAppender implements EventHandler {
     @Override
     public void handleEvent(Event event) {
         LOGGER.trace("Looking for the Cassandra datasource");
-        try (Session session = createSession()){
-            ResultSet execute;
-            try {
-                execute = session.execute("USE " + keyspace + ";");
-            } catch (InvalidQueryException e) {
-                session.execute("CREATE KEYSPACE IF NOT EXISTS " + keyspace + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };");
-                session.execute("USE " + keyspace + ";");
-            }
-
-            execute = session.execute("select columnfamily_name from system.schema_columnfamilies where keyspace_name = '"+keyspace+"';");
-            List<Row> all = execute.all();
-            boolean found = false;
-            for(Row row : all) {
-                String table = row.getString("columnfamily_name");
-                if (table.equalsIgnoreCase(tableName)) {
-                    found = true;
-                    break;
-                }
-            }
-            if (!found) {
-                session.execute(createTableTemplate.replace("TABLENAME", tableName));
-                LOGGER.debug("Table {} has been created", tableName);
-            }
+        try (Session session = cluster.connect()){
+            useKeyspace(session, keyspace);
+            createTable(session, keyspace, tableName);
 
             Long timestamp = (Long) event.getProperty("timestamp");
-            java.util.Date date = timestamp != null ? new java.util.Date(timestamp) : new java.util.Date();
-            String jsonSt = marshaller.marshal(event);
-
-            String upsertQuery = upsertQueryTemplate.replaceAll("TABLENAME", tableName);
-
             if (timestamp == null) {
                 timestamp = System.currentTimeMillis();
             }
-            session.execute(upsertQuery, timestamp, jsonSt);
+            String jsonSt = marshaller.marshal(event);
+            session.execute(String.format(insertTemplate, tableName), timestamp, jsonSt);
 
             LOGGER.trace("Data inserted into {} table", tableName);
         } catch (Exception e) {
@@ -131,17 +109,32 @@ public class CassandraAppender implements EventHandler {
         }
     }
 
-    private Session createSession() {
-        Session session;
-        Builder clusterBuilder = Cluster.builder().addContactPoint(cassandraHost);
-        if (cassandraPort != null) {
-            clusterBuilder.withPort(cassandraPort);
+    private static void useKeyspace(Session session, String keyspace) {
+        try {
+            session.execute("USE " + keyspace + ";");
+        } catch (InvalidQueryException e) {
+            session.execute("CREATE KEYSPACE IF NOT EXISTS " + keyspace + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };");
+            session.execute("USE " + keyspace + ";");
         }
-        Cluster cluster = clusterBuilder.build();
-        session = cluster.connect();
-        return session;
     }
-    
+
+    private static void createTable(Session session, String keyspace, String tableName) {
+        ResultSet execute = session.execute("select columnfamily_name from system.schema_columnfamilies where keyspace_name = '"+keyspace+"';");
+        List<Row> all = execute.all();
+        boolean found = false;
+        for(Row row : all) {
+            String table = row.getString("columnfamily_name");
+            if (table.equalsIgnoreCase(tableName)) {
+                found = true;
+                break;
+            }
+        }
+        if (!found) {
+            session.execute(String.format(createTableTemplate, tableName));
+            LOGGER.debug("Table {} has been created", tableName);
+        }
+    }
+
     @Reference
     public void setMarshaller(Marshaller marshaller) {
         this.marshaller = marshaller;

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/72db51a4/appender/cassandra/src/test/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppenderTest.java
----------------------------------------------------------------------
diff --git a/appender/cassandra/src/test/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppenderTest.java b/appender/cassandra/src/test/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppenderTest.java
index 5cd9e81..ec28736 100644
--- a/appender/cassandra/src/test/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppenderTest.java
+++ b/appender/cassandra/src/test/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppenderTest.java
@@ -5,7 +5,9 @@ import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.Assert.assertThat;
 
+import java.util.Dictionary;
 import java.util.HashMap;
+import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
 
@@ -30,7 +32,7 @@ import com.datastax.driver.core.Session;
 public class CassandraAppenderTest {
 
     private static final String KEYSPACE = "decanter";
-    private static final int CASSANDRA_PORT = 9142;
+    private static final String CASSANDRA_PORT = "9142";
     private static final String CASSANDRA_HOST = "localhost";
     private static final String TABLE_NAME = "decanter";
     private static final String TOPIC = "decanter/collect/jmx";
@@ -68,8 +70,14 @@ public class CassandraAppenderTest {
     @Test
     public void testHandleEvent() throws Exception {
         Marshaller marshaller = new JsonMarshaller();
-        CassandraAppender appender = new CassandraAppender(marshaller, KEYSPACE, TABLE_NAME, CASSANDRA_HOST, 
-                                                           CASSANDRA_PORT, null, null);
+        CassandraAppender appender = new CassandraAppender();
+        Dictionary<String, Object> config = new Hashtable<String, Object>();
+        config.put("cassandra.host", CASSANDRA_HOST);
+        config.put("cassandra.port", CASSANDRA_PORT);
+        config.put("keyspace.name", KEYSPACE);
+        config.put("table.name", TABLE_NAME);
+        appender.setMarshaller(marshaller);
+        appender.activate(config);
         
         Map<String, Object> properties = new HashMap<>();
         properties.put(EventConstants.TIMESTAMP, TIMESTAMP);
@@ -90,7 +98,7 @@ public class CassandraAppenderTest {
 
     private Session getSesion() {
         Builder clusterBuilder = Cluster.builder().addContactPoint(CASSANDRA_HOST);
-        clusterBuilder.withPort(CASSANDRA_PORT);
+        clusterBuilder.withPort(Integer.valueOf(CASSANDRA_PORT));
 
         Cluster cluster = clusterBuilder.build();
         return cluster.connect();


[2/2] karaf-decanter git commit: Handle warnings

Posted by cs...@apache.org.
Handle warnings


Project: http://git-wip-us.apache.org/repos/asf/karaf-decanter/repo
Commit: http://git-wip-us.apache.org/repos/asf/karaf-decanter/commit/b9438dc3
Tree: http://git-wip-us.apache.org/repos/asf/karaf-decanter/tree/b9438dc3
Diff: http://git-wip-us.apache.org/repos/asf/karaf-decanter/diff/b9438dc3

Branch: refs/heads/master
Commit: b9438dc3ee725bb904649923a8c60f3609cf3cc5
Parents: 72db51a
Author: Christian Schneider <ch...@die-schneider.net>
Authored: Mon Aug 8 18:47:01 2016 +0200
Committer: Christian Schneider <ch...@die-schneider.net>
Committed: Mon Aug 8 18:47:01 2016 +0200

----------------------------------------------------------------------
 .../java/org/apache/karaf/decanter/collector/log/LogAppender.java   | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/b9438dc3/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogAppender.java
----------------------------------------------------------------------
diff --git a/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogAppender.java b/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogAppender.java
index 62d5692..c359d11 100644
--- a/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogAppender.java
+++ b/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogAppender.java
@@ -54,6 +54,7 @@ public class LogAppender implements PaxAppender {
 
     private EventAdmin dispatcher;
 
+    @SuppressWarnings("unchecked")
     @Activate
     public void activate(ComponentContext context) {
         this.properties = context.getProperties();