You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/10/31 07:13:59 UTC

git commit: FLUME-2206. ElasticSearchSink ttl field modification to mimic Elasticsearch way of specifying TTL

Updated Branches:
  refs/heads/trunk 3cc8cec0e -> 6dfe63cdc


FLUME-2206. ElasticSearchSink ttl field modification to mimic Elasticsearch way of specifying TTL

(Dib Ghosh via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/6dfe63cd
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/6dfe63cd
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/6dfe63cd

Branch: refs/heads/trunk
Commit: 6dfe63cdcebaa5f8091b4789f4df5f679ccb3596
Parents: 3cc8cec
Author: Hari Shreedharan <hs...@apache.org>
Authored: Wed Oct 30 23:13:09 2013 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Wed Oct 30 23:13:09 2013 -0700

----------------------------------------------------------------------
 .gitignore                                      |  1 +
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  7 ++-
 .../sink/elasticsearch/ElasticSearchSink.java   | 50 +++++++++++++++++++-
 .../ElasticSearchSinkConstants.java             |  1 +
 .../elasticsearch/TestElasticSearchSink.java    | 36 ++++++++++++++
 5 files changed, 91 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/6dfe63cd/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index ef0a495..b387391 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,3 +17,4 @@ derby.log
 .idea
 *.iml
 nb-configuration.xml
+.DS_Store

http://git-wip-us.apache.org/repos/asf/flume/blob/6dfe63cd/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index a768383..3a3038c 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1985,7 +1985,10 @@ indexType         logs
 clusterName       elasticsearch                                                            Name of the ElasticSearch cluster to connect to
 batchSize         100                                                                      Number of events to be written per txn.
 ttl               --                                                                       TTL in days, when set will cause the expired documents to be deleted automatically,
-                                                                                           if not set documents will never be automatically deleted
+                                                                                           if not set documents will never be automatically deleted. TTL is accepted both in the earlier form of
+                                                                                           integer only e.g. a1.sinks.k1.ttl = 5 and also with a qualifier ms (millisecond), s (second), m (minute),
+                                                                                           h (hour), d (day) and w (week). Example a1.sinks.k1.ttl = 5d will set TTL to 5 days. Follow
+                                                                                           http://www.elasticsearch.org/guide/reference/mapping/ttl-field/ for more information.
 serializer        org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer The ElasticSearchIndexRequestBuilderFactory or ElasticSearchEventSerializer to use. Implementations of
                                                                                            either class are accepted but ElasticSearchIndexRequestBuilderFactory is preferred.
 serializer.*      --                                                                       Properties to be passed to the serializer.
@@ -2003,7 +2006,7 @@ Example for agent named a1:
   a1.sinks.k1.indexType = bar_type
   a1.sinks.k1.clusterName = foobar_cluster
   a1.sinks.k1.batchSize = 500
-  a1.sinks.k1.ttl = 5
+  a1.sinks.k1.ttl = 5d
   a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
   a1.sinks.k1.channel = c1
 

http://git-wip-us.apache.org/repos/asf/flume/blob/6dfe63cd/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
index 3d01173..e38ab19 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
@@ -31,9 +31,12 @@ import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.IND
 import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER;
 import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER_PREFIX;
 import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL;
+import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL_REGEX;
 
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Channel;
@@ -98,6 +101,9 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
   private String clusterName = DEFAULT_CLUSTER_NAME;
   private String indexName = DEFAULT_INDEX_NAME;
   private String indexType = DEFAULT_INDEX_TYPE;
+  private final Pattern pattern
+    = Pattern.compile(TTL_REGEX, Pattern.CASE_INSENSITIVE);
+  private Matcher matcher = pattern.matcher("");
 
   private InetSocketTransportAddress[] serverAddresses;
 
@@ -269,8 +275,7 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
     }
 
     if (StringUtils.isNotBlank(context.getString(TTL))) {
-      this.ttlMs = TimeUnit.DAYS.toMillis(Integer.parseInt(context
-          .getString(TTL)));
+      this.ttlMs = parseTTL(context.getString(TTL));
       Preconditions.checkState(ttlMs > 0, TTL
           + " must be greater than 0 or not set.");
     }
@@ -354,6 +359,47 @@ public class ElasticSearchSink extends AbstractSink implements Configurable {
   }
 
   /*
+   * Returns TTL value of ElasticSearch index in milliseconds
+   * when TTL specifier is "ms" / "s" / "m" / "h" / "d" / "w".
+   * In case of unknown specifier TTL is not set. When specifier
+   * is not provided it defaults to days in milliseconds where the number
+   * of days is parsed integer from TTL string provided by user.
+   * <p>
+   *     Elasticsearch supports ttl values being provided in the format: 1d / 1w / 1ms / 1s / 1h / 1m
+   *     specify a time unit like d (days), m (minutes), h (hours), ms (milliseconds) or w (weeks),
+   *     milliseconds is used as default unit.
+   *     http://www.elasticsearch.org/guide/reference/mapping/ttl-field/.
+   * @param   ttl TTL value provided by user in flume configuration file for the sink
+   * @return  the ttl value in milliseconds
+  */
+  private long parseTTL(String ttl){
+    matcher = matcher.reset(ttl);
+    while (matcher.find()) {
+      if (matcher.group(2).equals("ms")) {
+        return Long.parseLong(matcher.group(1));
+      } else if (matcher.group(2).equals("s")) {
+        return TimeUnit.SECONDS.toMillis(Integer.parseInt(matcher.group(1)));
+      } else if (matcher.group(2).equals("m")) {
+        return TimeUnit.MINUTES.toMillis(Integer.parseInt(matcher.group(1)));
+      } else if (matcher.group(2).equals("h")) {
+        return TimeUnit.HOURS.toMillis(Integer.parseInt(matcher.group(1)));
+      } else if (matcher.group(2).equals("d")) {
+        return TimeUnit.DAYS.toMillis(Integer.parseInt(matcher.group(1)));
+      } else if (matcher.group(2).equals("w")) {
+        return TimeUnit.DAYS.toMillis(7 * Integer.parseInt(matcher.group(1)));
+      } else if (matcher.group(2).equals("")) {
+        logger.info("TTL qualifier is empty. Defaulting to day qualifier.");
+        return TimeUnit.DAYS.toMillis(Integer.parseInt(matcher.group(1)));
+      } else {
+        logger.debug("Unknown TTL qualifier provided. Setting TTL to 0.");
+        return 0;
+      }
+    }
+    logger.info("TTL not provided. Skipping the TTL config by returning 0.");
+    return 0;
+  }
+
+  /*
    * FOR TESTING ONLY...
    *
    * Opens a local discovery node for talking to an elasticsearch server running

http://git-wip-us.apache.org/repos/asf/flume/blob/6dfe63cd/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
index 7f75e22..dd0c59d 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java
@@ -78,4 +78,5 @@ public class ElasticSearchSinkConstants {
   public static final String DEFAULT_INDEX_NAME = "flume";
   public static final String DEFAULT_INDEX_TYPE = "log";
   public static final String DEFAULT_CLUSTER_NAME = "elasticsearch";
+  public static final String TTL_REGEX = "^(\\d+)(\\D*)";
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/6dfe63cd/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
index 3f2ec6e..71789e8 100644
--- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
+++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
@@ -31,6 +31,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
 
@@ -286,6 +288,40 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
     assertTrue(CustomElasticSearchIndexRequestBuilderFactory.hasContext);
   }
 
+  @Test
+  public void shouldParseFullyQualifiedTTLs(){
+    Map<String, Long> testTTLMap = new HashMap<String, Long>();
+    testTTLMap.put("1ms", Long.valueOf(1));
+    testTTLMap.put("1s", Long.valueOf(1000));
+    testTTLMap.put("1m", Long.valueOf(60000));
+    testTTLMap.put("1h", Long.valueOf(3600000));
+    testTTLMap.put("1d", Long.valueOf(86400000));
+    testTTLMap.put("1w", Long.valueOf(604800000));
+    testTTLMap.put("1",  Long.valueOf(86400000));
+
+    parameters.put(HOSTNAMES, "10.5.5.27");
+    parameters.put(CLUSTER_NAME, "testing-cluster-name");
+    parameters.put(INDEX_NAME, "testing-index-name");
+    parameters.put(INDEX_TYPE, "testing-index-type");
+
+    for (String ttl : testTTLMap.keySet()) {
+      parameters.put(TTL, ttl);
+      fixture = new ElasticSearchSink();
+      fixture.configure(new Context(parameters));
+
+      InetSocketTransportAddress[] expected = {new InetSocketTransportAddress(
+        "10.5.5.27", DEFAULT_PORT)};
+
+      assertEquals("testing-cluster-name", fixture.getClusterName());
+      assertEquals("testing-index-name", fixture.getIndexName());
+      assertEquals("testing-index-type", fixture.getIndexType());
+      System.out.println("TTL MS" + Long.toString(testTTLMap.get(ttl)));
+      assertEquals((long) testTTLMap.get(ttl), fixture.getTTLMs());
+      assertArrayEquals(expected, fixture.getServerAddresses());
+
+    }
+  }
+
   public static final class CustomElasticSearchIndexRequestBuilderFactory
       extends AbstractElasticSearchIndexRequestBuilderFactory {