You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/10/31 07:13:59 UTC
git commit: FLUME-2206. ElasticSearchSink ttl field modification to
mimic Elasticsearch way of specifying TTL
Updated Branches:
refs/heads/trunk 3cc8cec0e -> 6dfe63cdc
FLUME-2206. ElasticSearchSink ttl field modification to mimic Elasticsearch way of specifying TTL
(Dib Ghosh via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/6dfe63cd
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/6dfe63cd
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/6dfe63cd
Branch: refs/heads/trunk
Commit: 6dfe63cdcebaa5f8091b4789f4df5f679ccb3596
Parents: 3cc8cec
Author: Hari Shreedharan <hs...@apache.org>
Authored: Wed Oct 30 23:13:09 2013 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Wed Oct 30 23:13:09 2013 -0700
----------------------------------------------------------------------
.gitignore | 1 +
flume-ng-doc/sphinx/FlumeUserGuide.rst | 7 ++-
.../sink/elasticsearch/ElasticSearchSink.java | 50 +++++++++++++++++++-
.../ElasticSearchSinkConstants.java | 1 +
.../elasticsearch/TestElasticSearchSink.java | 36 ++++++++++++++
5 files changed, 91 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/6dfe63cd/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index ef0a495..b387391 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,3 +17,4 @@ derby.log
.idea
*.iml
nb-configuration.xml
+.DS_Store
http://git-wip-us.apache.org/repos/asf/flume/blob/6dfe63cd/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index a768383..3a3038c 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1985,7 +1985,10 @@ indexType logs
clusterName elasticsearch Name of the ElasticSearch cluster to connect to
batchSize 100 Number of events to be written per txn.
ttl -- TTL in days, when set will cause the expired documents to be deleted automatically,
- if not set documents will never be automatically deleted
+ if not set documents will never be automatically deleted. TTL is accepted both in the earlier form of
+ integer only e.g. a1.sinks.k1.ttl = 5 and also with a qualifier ms (millisecond), s (second), m (minute),
+ h (hour), d (day) and w (week). Example a1.sinks.k1.ttl = 5d will set TTL to 5 days. Follow
+ http://www.elasticsearch.org/guide/reference/mapping/ttl-field/ for more information.
serializer org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer The ElasticSearchIndexRequestBuilderFactory or ElasticSearchEventSerializer to use. Implementations of
either class are accepted but ElasticSearchIndexRequestBuilderFactory is preferred.
serializer.* -- Properties to be passed to the serializer.
@@ -2003,7 +2006,7 @@ Example for agent named a1:
a1.sinks.k1.indexType = bar_type
a1.sinks.k1.clusterName = foobar_cluster
a1.sinks.k1.batchSize = 500
- a1.sinks.k1.ttl = 5
+ a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1
http://git-wip-us.apache.org/repos/asf/flume/blob/6dfe63cd/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
index 3d01173..e38ab19 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
@@ -31,9 +31,12 @@ import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.IND
import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER;
import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER_PREFIX;
import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL_REGEX;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
@@ -98,6 +101,9 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
private String clusterName = DEFAULT_CLUSTER_NAME;
private String indexName = DEFAULT_INDEX_NAME;
private String indexType = DEFAULT_INDEX_TYPE;
+ private final Pattern pattern
+ = Pattern.compile(TTL_REGEX, Pattern.CASE_INSENSITIVE);
+ private Matcher matcher = pattern.matcher("");
private InetSocketTransportAddress[] serverAddresses;
@@ -269,8 +275,7 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
}
if (StringUtils.isNotBlank(context.getString(TTL))) {
- this.ttlMs = TimeUnit.DAYS.toMillis(Integer.parseInt(context
- .getString(TTL)));
+ this.ttlMs = parseTTL(context.getString(TTL));
Preconditions.checkState(ttlMs > 0, TTL
+ " must be greater than 0 or not set.");
}
@@ -354,6 +359,47 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
}
/*
+ * Returns TTL value of ElasticSearch index in milliseconds
+ * when TTL specifier is "ms" / "s" / "m" / "h" / "d" / "w".
+ * In case of unknown specifier TTL is not set. When specifier
+ * is not provided it defaults to days in milliseconds where the number
+ * of days is parsed integer from TTL string provided by user.
+ * <p>
+ * Elasticsearch supports ttl values being provided in the format: 1d / 1w / 1ms / 1s / 1h / 1m
+ * specify a time unit like d (days), m (minutes), h (hours), ms (milliseconds) or w (weeks),
+ * milliseconds is used as default unit.
+ * http://www.elasticsearch.org/guide/reference/mapping/ttl-field/.
+ * @param ttl TTL value provided by user in flume configuration file for the sink
+ * @return the ttl value in milliseconds
+ */
+ private long parseTTL(String ttl){
+ matcher = matcher.reset(ttl);
+ while (matcher.find()) {
+ if (matcher.group(2).equals("ms")) {
+ return Long.parseLong(matcher.group(1));
+ } else if (matcher.group(2).equals("s")) {
+ return TimeUnit.SECONDS.toMillis(Integer.parseInt(matcher.group(1)));
+ } else if (matcher.group(2).equals("m")) {
+ return TimeUnit.MINUTES.toMillis(Integer.parseInt(matcher.group(1)));
+ } else if (matcher.group(2).equals("h")) {
+ return TimeUnit.HOURS.toMillis(Integer.parseInt(matcher.group(1)));
+ } else if (matcher.group(2).equals("d")) {
+ return TimeUnit.DAYS.toMillis(Integer.parseInt(matcher.group(1)));
+ } else if (matcher.group(2).equals("w")) {
+ return TimeUnit.DAYS.toMillis(7 * Integer.parseInt(matcher.group(1)));
+ } else if (matcher.group(2).equals("")) {
+ logger.info("TTL qualifier is empty. Defaulting to day qualifier.");
+ return TimeUnit.DAYS.toMillis(Integer.parseInt(matcher.group(1)));
+ } else {
+ logger.debug("Unknown TTL qualifier provided. Setting TTL to 0.");
+ return 0;
+ }
+ }
+ logger.info("TTL not provided. Skipping the TTL config by returning 0.");
+ return 0;
+ }
+
+ /*
* FOR TESTING ONLY...
*
* Opens a local discovery node for talking to an elasticsearch server running
http://git-wip-us.apache.org/repos/asf/flume/blob/6dfe63cd/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
index 7f75e22..dd0c59d 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
@@ -78,4 +78,5 @@ public class ElasticSearchSinkConstants {
public static final String DEFAULT_INDEX_NAME = "flume";
public static final String DEFAULT_INDEX_TYPE = "log";
public static final String DEFAULT_CLUSTER_NAME = "elasticsearch";
+ public static final String TTL_REGEX = "^(\\d+)(\\D*)";
}
http://git-wip-us.apache.org/repos/asf/flume/blob/6dfe63cd/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
index 3f2ec6e..71789e8 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
@@ -31,6 +31,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
@@ -286,6 +288,40 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
assertTrue(CustomElasticSearchIndexRequestBuilderFactory.hasContext);
}
+ @Test
+ public void shouldParseFullyQualifiedTTLs(){
+ Map<String, Long> testTTLMap = new HashMap<String, Long>();
+ testTTLMap.put("1ms", Long.valueOf(1));
+ testTTLMap.put("1s", Long.valueOf(1000));
+ testTTLMap.put("1m", Long.valueOf(60000));
+ testTTLMap.put("1h", Long.valueOf(3600000));
+ testTTLMap.put("1d", Long.valueOf(86400000));
+ testTTLMap.put("1w", Long.valueOf(604800000));
+ testTTLMap.put("1", Long.valueOf(86400000));
+
+ parameters.put(HOSTNAMES, "10.5.5.27");
+ parameters.put(CLUSTER_NAME, "testing-cluster-name");
+ parameters.put(INDEX_NAME, "testing-index-name");
+ parameters.put(INDEX_TYPE, "testing-index-type");
+
+ for (String ttl : testTTLMap.keySet()) {
+ parameters.put(TTL, ttl);
+ fixture = new ElasticSearchSink();
+ fixture.configure(new Context(parameters));
+
+ InetSocketTransportAddress[] expected = {new InetSocketTransportAddress(
+ "10.5.5.27", DEFAULT_PORT)};
+
+ assertEquals("testing-cluster-name", fixture.getClusterName());
+ assertEquals("testing-index-name", fixture.getIndexName());
+ assertEquals("testing-index-type", fixture.getIndexType());
+ System.out.println("TTL MS" + Long.toString(testTTLMap.get(ttl)));
+ assertEquals((long) testTTLMap.get(ttl), fixture.getTTLMs());
+ assertArrayEquals(expected, fixture.getServerAddresses());
+
+ }
+ }
+
public static final class CustomElasticSearchIndexRequestBuilderFactory
extends AbstractElasticSearchIndexRequestBuilderFactory {