You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by jb...@apache.org on 2020/03/03 09:29:14 UTC

[karaf-decanter] branch master updated: fix batch options, get batch options from config, get tags from config, filter junk properties from events

This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/karaf-decanter.git


The following commit(s) were added to refs/heads/master by this push:
     new 6adb891  fix batch options, get batch options from config, get tags from config, filter junk properties from events
     new a84504b  Merge pull request #123 from YotpoLtd/influxdb-appender
6adb891 is described below

commit 6adb8913abb4e0fef2eaf3186faa13300a9baf50
Author: Gilad Weinbach <gw...@yotpo.com>
AuthorDate: Thu Feb 27 08:02:15 2020 +0200

    fix batch options, get batch options from config, get tags from config, filter junk properties from events
---
 ...org.apache.karaf.decanter.appender.influxdb.cfg | 12 ++++++-
 .../appender/influxdb/InfluxDbAppender.java        | 41 +++++++++++++++++++---
 2 files changed, 48 insertions(+), 5 deletions(-)

diff --git a/appender/influxdb/src/main/cfg/org.apache.karaf.decanter.appender.influxdb.cfg b/appender/influxdb/src/main/cfg/org.apache.karaf.decanter.appender.influxdb.cfg
index 6e4b28d..eb9eb36 100644
--- a/appender/influxdb/src/main/cfg/org.apache.karaf.decanter.appender.influxdb.cfg
+++ b/appender/influxdb/src/main/cfg/org.apache.karaf.decanter.appender.influxdb.cfg
@@ -31,4 +31,14 @@ url=http://localhost:8086
 #password=
 
 # InfluxDB database name
-database=decanter
\ No newline at end of file
+database=decanter
+
+# InfluxDB Batch Options
+# batchActionsLimit=200
+# precision=MILLISECONDS
+# flushDuration=100
+
+# InfluxDB tags to be sent for each point
+# Several tags can also be specified.
+# tag.key1=val1
+# tag.key2=val2
\ No newline at end of file
diff --git a/appender/influxdb/src/main/java/org/apache/karaf/decanter/appender/influxdb/InfluxDbAppender.java b/appender/influxdb/src/main/java/org/apache/karaf/decanter/appender/influxdb/InfluxDbAppender.java
index 783f051..b1b4aac 100644
--- a/appender/influxdb/src/main/java/org/apache/karaf/decanter/appender/influxdb/InfluxDbAppender.java
+++ b/appender/influxdb/src/main/java/org/apache/karaf/decanter/appender/influxdb/InfluxDbAppender.java
@@ -17,6 +17,7 @@
 package org.apache.karaf.decanter.appender.influxdb;
 
 import org.apache.karaf.decanter.appender.utils.EventFilter;
+import org.influxdb.BatchOptions;
 import org.influxdb.InfluxDB;
 import org.influxdb.InfluxDBFactory;
 import org.influxdb.dto.Point;
@@ -29,6 +30,7 @@ import org.osgi.service.event.EventConstants;
 import org.osgi.service.event.EventHandler;
 
 import java.util.Dictionary;
+import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -42,6 +44,8 @@ public class InfluxDbAppender implements EventHandler {
 
     private Dictionary<String, Object> config;
 
+    private Map<String, String> tags = new HashMap<>();
+
     private InfluxDB influxDB;
 
     @Activate
@@ -72,8 +76,33 @@ public class InfluxDbAppender implements EventHandler {
         if (config.get("database") != null) {
             database = (String) config.get("database");
         }
-        this.influxDB.enableBatch(100, 200, TimeUnit.MILLISECONDS);
-        this.influxDB.setRetentionPolicy("defaultPolicy");
+
+        BatchOptions batchOptions = BatchOptions.DEFAULTS;
+        if (config.get("batchActionsLimit") != null) {
+            int batchActionsLimit = Integer.parseInt((String) config.get("batchActionsLimit"));
+            batchOptions = batchOptions.actions(batchActionsLimit);
+        }
+        if (config.get("precision") != null) {
+            TimeUnit timeUnit = TimeUnit.valueOf((String) config.get("precision"));
+            batchOptions = batchOptions.precision(timeUnit);
+        }
+
+        if (config.get("flushDuration") != null) {
+            int flushDuration = Integer.parseInt((String) config.get("flushDuration"));
+            batchOptions = batchOptions.flushDuration(flushDuration);
+        }
+
+        String prefix = "tag.";
+        for (Enumeration<String> e = config.keys(); e.hasMoreElements(); ) {
+            String key = e.nextElement();
+            if (key.startsWith(prefix)) {
+                Object value = this.config.get(key);
+                if (value != null)
+                    tags.put(key.substring(4), value.toString());
+            }
+        }
+
+        this.influxDB.enableBatch(batchOptions);
         this.influxDB.setDatabase(database);
     }
 
@@ -94,9 +123,13 @@ public class InfluxDbAppender implements EventHandler {
             Map<String, Object> data = new HashMap<>();
             for (String propertyName : event.getPropertyNames()) {
                 Object propertyValue = event.getProperty(propertyName);
-                data.put(propertyName, propertyValue);
+                if (propertyValue != null) {
+                    if (propertyValue instanceof Number || propertyValue instanceof String || propertyValue instanceof Boolean) {
+                        data.put(propertyName, propertyValue);
+                    }
+                }
             }
-            Point point = Point.measurement(type).fields(data).build();
+            Point point = Point.measurement(type).fields(data).tag(tags).build();
             influxDB.write(point);
         }
     }