You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by jb...@apache.org on 2020/03/03 09:29:14 UTC
[karaf-decanter] branch master updated: fix batch options,
get batch options from config, get tags from config,
filter junk properties from events
This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/karaf-decanter.git
The following commit(s) were added to refs/heads/master by this push:
new 6adb891 fix batch options, get batch options from config, get tags from config, filter junk properties from events
new a84504b Merge pull request #123 from YotpoLtd/influxdb-appender
6adb891 is described below
commit 6adb8913abb4e0fef2eaf3186faa13300a9baf50
Author: Gilad Weinbach <gw...@yotpo.com>
AuthorDate: Thu Feb 27 08:02:15 2020 +0200
fix batch options, get batch options from config, get tags from config, filter junk properties from events
---
...org.apache.karaf.decanter.appender.influxdb.cfg | 12 ++++++-
.../appender/influxdb/InfluxDbAppender.java | 41 +++++++++++++++++++---
2 files changed, 48 insertions(+), 5 deletions(-)
diff --git a/appender/influxdb/src/main/cfg/org.apache.karaf.decanter.appender.influxdb.cfg b/appender/influxdb/src/main/cfg/org.apache.karaf.decanter.appender.influxdb.cfg
index 6e4b28d..eb9eb36 100644
--- a/appender/influxdb/src/main/cfg/org.apache.karaf.decanter.appender.influxdb.cfg
+++ b/appender/influxdb/src/main/cfg/org.apache.karaf.decanter.appender.influxdb.cfg
@@ -31,4 +31,14 @@ url=http://localhost:8086
#password=
# InfluxDB database name
-database=decanter
\ No newline at end of file
+database=decanter
+
+# InfluxDB Batch Options
+# batchActionsLimit=200
+# precision=MILLISECONDS
+# flushDuration=100
+
+# InfluxDB tags to be sent for each point
+# Several tags can also be specified.
+# tag.key1=val1
+# tag.key2=val2
\ No newline at end of file
diff --git a/appender/influxdb/src/main/java/org/apache/karaf/decanter/appender/influxdb/InfluxDbAppender.java b/appender/influxdb/src/main/java/org/apache/karaf/decanter/appender/influxdb/InfluxDbAppender.java
index 783f051..b1b4aac 100644
--- a/appender/influxdb/src/main/java/org/apache/karaf/decanter/appender/influxdb/InfluxDbAppender.java
+++ b/appender/influxdb/src/main/java/org/apache/karaf/decanter/appender/influxdb/InfluxDbAppender.java
@@ -17,6 +17,7 @@
package org.apache.karaf.decanter.appender.influxdb;
import org.apache.karaf.decanter.appender.utils.EventFilter;
+import org.influxdb.BatchOptions;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Point;
@@ -29,6 +30,7 @@ import org.osgi.service.event.EventConstants;
import org.osgi.service.event.EventHandler;
import java.util.Dictionary;
+import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -42,6 +44,8 @@ public class InfluxDbAppender implements EventHandler {
private Dictionary<String, Object> config;
+ private Map<String, String> tags = new HashMap<>();
+
private InfluxDB influxDB;
@Activate
@@ -72,8 +76,33 @@ public class InfluxDbAppender implements EventHandler {
if (config.get("database") != null) {
database = (String) config.get("database");
}
- this.influxDB.enableBatch(100, 200, TimeUnit.MILLISECONDS);
- this.influxDB.setRetentionPolicy("defaultPolicy");
+
+ BatchOptions batchOptions = BatchOptions.DEFAULTS;
+ if (config.get("batchActionsLimit") != null) {
+ int batchActionsLimit = Integer.parseInt((String) config.get("batchActionsLimit"));
+ batchOptions = batchOptions.actions(batchActionsLimit);
+ }
+ if (config.get("precision") != null) {
+ TimeUnit timeUnit = TimeUnit.valueOf((String) config.get("precision"));
+ batchOptions = batchOptions.precision(timeUnit);
+ }
+
+ if (config.get("flushDuration") != null) {
+ int flushDuration = Integer.parseInt((String) config.get("flushDuration"));
+ batchOptions = batchOptions.flushDuration(flushDuration);
+ }
+
+ String prefix = "tag.";
+ for (Enumeration<String> e = config.keys(); e.hasMoreElements(); ) {
+ String key = e.nextElement();
+ if (key.startsWith(prefix)) {
+ Object value = this.config.get(key);
+ if (value != null)
+ tags.put(key.substring(4), value.toString());
+ }
+ }
+
+ this.influxDB.enableBatch(batchOptions);
this.influxDB.setDatabase(database);
}
@@ -94,9 +123,13 @@ public class InfluxDbAppender implements EventHandler {
Map<String, Object> data = new HashMap<>();
for (String propertyName : event.getPropertyNames()) {
Object propertyValue = event.getProperty(propertyName);
- data.put(propertyName, propertyValue);
+ if (propertyValue != null) {
+ if (propertyValue instanceof Number || propertyValue instanceof String || propertyValue instanceof Boolean) {
+ data.put(propertyName, propertyValue);
+ }
+ }
}
- Point point = Point.measurement(type).fields(data).build();
+ Point point = Point.measurement(type).fields(data).tag(tags).build();
influxDB.write(point);
}
}