You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/08/08 21:57:16 UTC

[1/7] git commit: percolate implementation, untested

Repository: incubator-streams
Updated Branches:
  refs/heads/master 8d9986af7 -> a538352fc


percolate implementation, untested


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4e9a2bb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4e9a2bb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4e9a2bb5

Branch: refs/heads/master
Commit: 4e9a2bb5757ccb156fbe41756c41459f9d74e0fa
Parents: 38ed41a
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Fri Jul 11 00:49:51 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Fri Jul 11 00:49:51 2014 -0500

----------------------------------------------------------------------
 .../elasticsearch/PercolateProcessor.java       | 205 +++++++++++++++----
 .../ElasticsearchWriterConfiguration.json       |  20 +-
 .../apache/streams/data/util/ActivityUtil.java  |   5 +
 3 files changed, 189 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e9a2bb5/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
index 9bc8d42..c5d9f4c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
@@ -18,25 +18,33 @@
 
 package org.apache.streams.elasticsearch;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.percolate.PercolateResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.QueryStringQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * References:
@@ -48,20 +56,23 @@ import java.util.concurrent.LinkedBlockingQueue;
  * [t.co behavior]      https://dev.twitter.com/docs/tco-redirection-behavior
  */
 
-public class PercolateProcessor implements StreamsProcessor, Runnable {
+public class PercolateProcessor implements StreamsProcessor {
+
     private final static Logger LOGGER = LoggerFactory.getLogger(PercolateProcessor.class);
 
-    private ObjectMapper mapper = new ObjectMapper();
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
     protected Queue<StreamsDatum> inQueue;
     protected Queue<StreamsDatum> outQueue;
 
+    public String TAGS_EXTENSION = "tags";
+
     private ElasticsearchWriterConfiguration config;
     private ElasticsearchClientManager manager;
+    private BulkRequestBuilder bulkBuilder;
 
-    public PercolateProcessor(Queue<StreamsDatum> inQueue) {
-        this.inQueue = inQueue;
-        this.outQueue = new LinkedBlockingQueue<StreamsDatum>();
+    public PercolateProcessor(ElasticsearchConfiguration config) {
+        manager = new ElasticsearchClientManager(config);
     }
 
     public ElasticsearchClientManager getManager() {
@@ -72,7 +83,7 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
         this.manager = manager;
     }
 
-    public ElasticsearchWriterConfiguration getConfig() {
+    public ElasticsearchConfiguration getConfig() {
         return config;
     }
 
@@ -80,16 +91,6 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
         this.config = config;
     }
 
-    public void start() {
-        Preconditions.checkNotNull(config);
-        Preconditions.checkNotNull(manager);
-        Preconditions.checkNotNull(manager.getClient());
-    }
-
-    public void stop() {
-
-    }
-
     public Queue<StreamsDatum> getProcessorOutputQueue() {
         return outQueue;
     }
@@ -115,19 +116,21 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
             json = node.asText();
         }
 
-        PercolateResponse response = manager.getClient().preparePercolate().setDocumentType(config.getType()).setSource(json).execute().actionGet();
+        PercolateResponse response = manager.getClient().preparePercolate().setSource(json).execute().actionGet();
 
         ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
 
         for (PercolateResponse.Match match : response.getMatches()) {
             tagArray.add(match.getId().string());
-
         }
 
-        // need utility methods for get / create specific node
-        ObjectNode extensions = (ObjectNode) node.get("extensions");
-        ObjectNode w2o = (ObjectNode) extensions.get("w2o");
-        w2o.put("tags", tagArray);
+        Activity activity = mapper.convertValue(node, Activity.class);
+
+        Map<String, Object> extensions = ActivityUtil.ensureExtensions(activity);
+
+        extensions.put(TAGS_EXTENSION, tagArray);
+
+        activity.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
 
         result.add(entry);
 
@@ -137,33 +140,155 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
 
     @Override
     public void prepare(Object o) {
-        start();
+
+        Preconditions.checkNotNull(manager);
+        Preconditions.checkNotNull(manager.getClient());
+        Preconditions.checkNotNull(config);
+        Preconditions.checkNotNull(config.getTags());
+        Preconditions.checkArgument(config.getTags().size() > 0);
+
+        // consider using mapping to figure out what fields are included in _all
+        //manager.getClient().admin().indices().prepareGetMappings(config.getIndex()).get().getMappings().get(config.getType()).;
+
+        deleteOldQueries(config.getIndex());
+        for (Tag tag : config.getTags()) {
+            PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag.getId());
+            queryBuilder.addQuery(tag.getQuery(), FilterLevel.MUST, "_all");
+            addPercolateRule(queryBuilder, config.getIndex());
+        }
+        if (writePercolateRules() == true)
+            LOGGER.info("wrote " + bulkBuilder.numberOfActions() + " tags to _percolator");
+        else
+            LOGGER.error("FAILED writing " + bulkBuilder.numberOfActions() + " tags to _percolator");
+
+
     }
 
     @Override
     public void cleanUp() {
-        stop();
+        manager.getClient().close();
     }
 
-    @Override
-    public void run() {
+    public int numOfPercolateRules() {
+        return this.bulkBuilder.numberOfActions();
+    }
 
-        while (true) {
-            StreamsDatum item;
-            try {
-                item = inQueue.poll();
+    public void addPercolateRule(PercolateQueryBuilder builder, String index) {
+        this.bulkBuilder.add(manager.getClient().prepareIndex("_percolator", index, builder.getId()).setSource(builder.getSource()));
+    }
 
-                Thread.sleep(new Random().nextInt(100));
+    /**
+     *
+     * @return returns true if all rules were addded. False indicates one or more rules have failed.
+     */
+    public boolean writePercolateRules() {
+        if(this.numOfPercolateRules() < 0) {
+            throw new RuntimeException("No Rules Have been added!");
+        }
+        BulkResponse response = this.bulkBuilder.execute().actionGet();
+        for(BulkItemResponse r : response.getItems()) {
+            if(r.isFailed()) {
+                System.out.println(r.getId()+"\t"+r.getFailureMessage());
+            }
+        }
+        return !response.hasFailures();
+    }
 
-                for (StreamsDatum entry : process(item)) {
-                    outQueue.offer(entry);
-                }
+    /**
+     *
+     * @param ids
+     * @param index
+     * @return  Returns true if all of the old tags were removed. False indicates one or more tags were not removed.
+     */
+    public boolean removeOldTags(Set<String> ids, String index) {
+        if(ids.size() == 0) {
+            return false;
+        }
+        BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+        for(String id : ids) {
+            bulk.add(manager.getClient().prepareDelete("_percolator", index, id));
+        }
+        return !bulk.execute().actionGet().hasFailures();
+    }
 
+    public Set<String> getActivePercolateTags(String index) {
+        Set<String> tags = new HashSet<String>();
+        SearchRequestBuilder searchBuilder = manager.getClient().prepareSearch("_percolator").setTypes(index).setSize(1000);
+        SearchResponse response = searchBuilder.setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
+        SearchHits hits = response.getHits();
+        for(SearchHit hit : hits.getHits()) {
+            tags.add(hit.id());
+        }
+        return tags;
+    }
 
-            } catch (Exception e) {
-                e.printStackTrace();
+    /**
+     *
+     * @param index
+     * @return
+     */
+    public boolean deleteOldQueries(String index) {
+        Set<String> tags = getActivePercolateTags(index);
+        if(tags.size() == 0) {
+            LOGGER.warn("No active tags were found in _percolator for index : {}", index);
+            return false;
+        }
+        LOGGER.info("Deleting {} tags.", tags.size());
+        BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+        for(String tag : tags) {
+            bulk.add(manager.getClient().prepareDelete("_percolator", index, tag));
+        }
+        BulkResponse response =bulk.execute().actionGet();
+        return !response.hasFailures();
+    }
+
+    public static class PercolateQueryBuilder {
+
+        private BoolQueryBuilder queryBuilder;
+        private String id;
+
+        public PercolateQueryBuilder(String id) {
+            this.id = id;
+            this.queryBuilder = QueryBuilders.boolQuery();
+        }
 
+        public void setMinumumNumberShouldMatch(int shouldMatch) {
+            this.queryBuilder.minimumNumberShouldMatch(shouldMatch);
+        }
+
+
+        public void addQuery(String query, FilterLevel level, String... fields) {
+            QueryStringQueryBuilder builder = QueryBuilders.queryString(query);
+            if(fields != null && fields.length > 0) {
+                for(String field : fields) {
+                    builder.field(field);
+                }
             }
+            switch (level) {
+                case MUST:
+                    this.queryBuilder.must(builder);
+                    break;
+                case SHOULD:
+                    this.queryBuilder.should(builder);
+                    break;
+                case MUST_NOT:
+                    this.queryBuilder.mustNot(builder);
+            }
+        }
+
+        public String getId() {
+            return this.id;
         }
+
+        public String getSource() {
+            return "{ \n\"query\" : "+this.queryBuilder.toString()+"\n}";
+        }
+
+
     }
+
+    public enum FilterLevel {
+        MUST, SHOULD, MUST_NOT
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e9a2bb5/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index a38ff85..bf7b146 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -31,6 +31,24 @@
 		},
 		"maxTimeBetweenFlushMs": {
 			"type": "integer"
-		}
+		},
+        "tags": {
+            "type": "array",
+            "description": "Tags to apply",
+            "items": {
+                "type": "object",
+                "description": "Tag to apply",
+                "properties": {
+                    "id": {
+                        "type": "string",
+                        "description": "Tag identifier"
+                    },
+                    "query": {
+                        "type": "string",
+                        "description": "Tag query"
+                    }
+                }
+            }
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e9a2bb5/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
index f1fcf44..3684b32 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
@@ -18,6 +18,8 @@
 
 package org.apache.streams.data.util;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 
 import java.util.HashMap;
@@ -27,6 +29,7 @@ import java.util.Map;
  * Utility class for managing activities
  */
 public class ActivityUtil {
+
     private ActivityUtil() {}
 
     /**
@@ -58,6 +61,8 @@ public class ActivityUtil {
      */
     public static final String LOCATION_EXTENSION_COORDINATES = "coordinates";
 
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
     /**
      * Creates a standard extension property
      * @param activity activity to create the property in


[2/7] git commit: percolate implementation, untested

Posted by sb...@apache.org.
percolate implementation, untested


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b3d849d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b3d849d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b3d849d6

Branch: refs/heads/master
Commit: b3d849d65b4137c84f9ed12d86322d14758f9b3f
Parents: b7e4c34
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Fri Jul 11 00:49:51 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Tue Jul 15 20:57:39 2014 -0500

----------------------------------------------------------------------
 .../elasticsearch/PercolateProcessor.java       | 205 +++++++++++++++----
 .../ElasticsearchWriterConfiguration.json       |  20 +-
 .../apache/streams/data/util/ActivityUtil.java  |   5 +
 3 files changed, 189 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b3d849d6/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
index 9bc8d42..c5d9f4c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
@@ -18,25 +18,33 @@
 
 package org.apache.streams.elasticsearch;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.percolate.PercolateResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.QueryStringQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * References:
@@ -48,20 +56,23 @@ import java.util.concurrent.LinkedBlockingQueue;
  * [t.co behavior]      https://dev.twitter.com/docs/tco-redirection-behavior
  */
 
-public class PercolateProcessor implements StreamsProcessor, Runnable {
+public class PercolateProcessor implements StreamsProcessor {
+
     private final static Logger LOGGER = LoggerFactory.getLogger(PercolateProcessor.class);
 
-    private ObjectMapper mapper = new ObjectMapper();
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
     protected Queue<StreamsDatum> inQueue;
     protected Queue<StreamsDatum> outQueue;
 
+    public String TAGS_EXTENSION = "tags";
+
     private ElasticsearchWriterConfiguration config;
     private ElasticsearchClientManager manager;
+    private BulkRequestBuilder bulkBuilder;
 
-    public PercolateProcessor(Queue<StreamsDatum> inQueue) {
-        this.inQueue = inQueue;
-        this.outQueue = new LinkedBlockingQueue<StreamsDatum>();
+    public PercolateProcessor(ElasticsearchConfiguration config) {
+        manager = new ElasticsearchClientManager(config);
     }
 
     public ElasticsearchClientManager getManager() {
@@ -72,7 +83,7 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
         this.manager = manager;
     }
 
-    public ElasticsearchWriterConfiguration getConfig() {
+    public ElasticsearchConfiguration getConfig() {
         return config;
     }
 
@@ -80,16 +91,6 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
         this.config = config;
     }
 
-    public void start() {
-        Preconditions.checkNotNull(config);
-        Preconditions.checkNotNull(manager);
-        Preconditions.checkNotNull(manager.getClient());
-    }
-
-    public void stop() {
-
-    }
-
     public Queue<StreamsDatum> getProcessorOutputQueue() {
         return outQueue;
     }
@@ -115,19 +116,21 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
             json = node.asText();
         }
 
-        PercolateResponse response = manager.getClient().preparePercolate().setDocumentType(config.getType()).setSource(json).execute().actionGet();
+        PercolateResponse response = manager.getClient().preparePercolate().setSource(json).execute().actionGet();
 
         ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
 
         for (PercolateResponse.Match match : response.getMatches()) {
             tagArray.add(match.getId().string());
-
         }
 
-        // need utility methods for get / create specific node
-        ObjectNode extensions = (ObjectNode) node.get("extensions");
-        ObjectNode w2o = (ObjectNode) extensions.get("w2o");
-        w2o.put("tags", tagArray);
+        Activity activity = mapper.convertValue(node, Activity.class);
+
+        Map<String, Object> extensions = ActivityUtil.ensureExtensions(activity);
+
+        extensions.put(TAGS_EXTENSION, tagArray);
+
+        activity.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
 
         result.add(entry);
 
@@ -137,33 +140,155 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
 
     @Override
     public void prepare(Object o) {
-        start();
+
+        Preconditions.checkNotNull(manager);
+        Preconditions.checkNotNull(manager.getClient());
+        Preconditions.checkNotNull(config);
+        Preconditions.checkNotNull(config.getTags());
+        Preconditions.checkArgument(config.getTags().size() > 0);
+
+        // consider using mapping to figure out what fields are included in _all
+        //manager.getClient().admin().indices().prepareGetMappings(config.getIndex()).get().getMappings().get(config.getType()).;
+
+        deleteOldQueries(config.getIndex());
+        for (Tag tag : config.getTags()) {
+            PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag.getId());
+            queryBuilder.addQuery(tag.getQuery(), FilterLevel.MUST, "_all");
+            addPercolateRule(queryBuilder, config.getIndex());
+        }
+        if (writePercolateRules() == true)
+            LOGGER.info("wrote " + bulkBuilder.numberOfActions() + " tags to _percolator");
+        else
+            LOGGER.error("FAILED writing " + bulkBuilder.numberOfActions() + " tags to _percolator");
+
+
     }
 
     @Override
     public void cleanUp() {
-        stop();
+        manager.getClient().close();
     }
 
-    @Override
-    public void run() {
+    public int numOfPercolateRules() {
+        return this.bulkBuilder.numberOfActions();
+    }
 
-        while (true) {
-            StreamsDatum item;
-            try {
-                item = inQueue.poll();
+    public void addPercolateRule(PercolateQueryBuilder builder, String index) {
+        this.bulkBuilder.add(manager.getClient().prepareIndex("_percolator", index, builder.getId()).setSource(builder.getSource()));
+    }
 
-                Thread.sleep(new Random().nextInt(100));
+    /**
+     *
+     * @return returns true if all rules were addded. False indicates one or more rules have failed.
+     */
+    public boolean writePercolateRules() {
+        if(this.numOfPercolateRules() < 0) {
+            throw new RuntimeException("No Rules Have been added!");
+        }
+        BulkResponse response = this.bulkBuilder.execute().actionGet();
+        for(BulkItemResponse r : response.getItems()) {
+            if(r.isFailed()) {
+                System.out.println(r.getId()+"\t"+r.getFailureMessage());
+            }
+        }
+        return !response.hasFailures();
+    }
 
-                for (StreamsDatum entry : process(item)) {
-                    outQueue.offer(entry);
-                }
+    /**
+     *
+     * @param ids
+     * @param index
+     * @return  Returns true if all of the old tags were removed. False indicates one or more tags were not removed.
+     */
+    public boolean removeOldTags(Set<String> ids, String index) {
+        if(ids.size() == 0) {
+            return false;
+        }
+        BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+        for(String id : ids) {
+            bulk.add(manager.getClient().prepareDelete("_percolator", index, id));
+        }
+        return !bulk.execute().actionGet().hasFailures();
+    }
 
+    public Set<String> getActivePercolateTags(String index) {
+        Set<String> tags = new HashSet<String>();
+        SearchRequestBuilder searchBuilder = manager.getClient().prepareSearch("_percolator").setTypes(index).setSize(1000);
+        SearchResponse response = searchBuilder.setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
+        SearchHits hits = response.getHits();
+        for(SearchHit hit : hits.getHits()) {
+            tags.add(hit.id());
+        }
+        return tags;
+    }
 
-            } catch (Exception e) {
-                e.printStackTrace();
+    /**
+     *
+     * @param index
+     * @return
+     */
+    public boolean deleteOldQueries(String index) {
+        Set<String> tags = getActivePercolateTags(index);
+        if(tags.size() == 0) {
+            LOGGER.warn("No active tags were found in _percolator for index : {}", index);
+            return false;
+        }
+        LOGGER.info("Deleting {} tags.", tags.size());
+        BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+        for(String tag : tags) {
+            bulk.add(manager.getClient().prepareDelete("_percolator", index, tag));
+        }
+        BulkResponse response =bulk.execute().actionGet();
+        return !response.hasFailures();
+    }
+
+    public static class PercolateQueryBuilder {
+
+        private BoolQueryBuilder queryBuilder;
+        private String id;
+
+        public PercolateQueryBuilder(String id) {
+            this.id = id;
+            this.queryBuilder = QueryBuilders.boolQuery();
+        }
 
+        public void setMinumumNumberShouldMatch(int shouldMatch) {
+            this.queryBuilder.minimumNumberShouldMatch(shouldMatch);
+        }
+
+
+        public void addQuery(String query, FilterLevel level, String... fields) {
+            QueryStringQueryBuilder builder = QueryBuilders.queryString(query);
+            if(fields != null && fields.length > 0) {
+                for(String field : fields) {
+                    builder.field(field);
+                }
             }
+            switch (level) {
+                case MUST:
+                    this.queryBuilder.must(builder);
+                    break;
+                case SHOULD:
+                    this.queryBuilder.should(builder);
+                    break;
+                case MUST_NOT:
+                    this.queryBuilder.mustNot(builder);
+            }
+        }
+
+        public String getId() {
+            return this.id;
         }
+
+        public String getSource() {
+            return "{ \n\"query\" : "+this.queryBuilder.toString()+"\n}";
+        }
+
+
     }
+
+    public enum FilterLevel {
+        MUST, SHOULD, MUST_NOT
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b3d849d6/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index a38ff85..bf7b146 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -31,6 +31,24 @@
 		},
 		"maxTimeBetweenFlushMs": {
 			"type": "integer"
-		}
+		},
+        "tags": {
+            "type": "array",
+            "description": "Tags to apply",
+            "items": {
+                "type": "object",
+                "description": "Tag to apply",
+                "properties": {
+                    "id": {
+                        "type": "string",
+                        "description": "Tag identifier"
+                    },
+                    "query": {
+                        "type": "string",
+                        "description": "Tag query"
+                    }
+                }
+            }
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b3d849d6/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
index f1fcf44..3684b32 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
@@ -18,6 +18,8 @@
 
 package org.apache.streams.data.util;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 
 import java.util.HashMap;
@@ -27,6 +29,7 @@ import java.util.Map;
  * Utility class for managing activities
  */
 public class ActivityUtil {
+
     private ActivityUtil() {}
 
     /**
@@ -58,6 +61,8 @@ public class ActivityUtil {
      */
     public static final String LOCATION_EXTENSION_COORDINATES = "coordinates";
 
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
     /**
      * Creates a standard extension property
      * @param activity activity to create the property in


[4/7] git commit: Merge branch 'STREAMS-134' of https://git-wip-us.apache.org/repos/asf/incubator-streams into STREAMS-134

Posted by sb...@apache.org.
Merge branch 'STREAMS-134' of https://git-wip-us.apache.org/repos/asf/incubator-streams into STREAMS-134

Conflicts:
	streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
	streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2f1bc422
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2f1bc422
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2f1bc422

Branch: refs/heads/master
Commit: 2f1bc422612d3d6030a48bd956ba9b0c1994855c
Parents: 7df265f 4e9a2bb
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Tue Jul 15 21:00:11 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Tue Jul 15 21:00:11 2014 -0500

----------------------------------------------------------------------

----------------------------------------------------------------------



[3/7] git commit: percolate processor - tested with streams-examples/elasticsearch-tag

Posted by sb...@apache.org.
percolate processor - tested with streams-examples/elasticsearch-tag


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7df265f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7df265f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7df265f5

Branch: refs/heads/master
Commit: 7df265f5436d167c0d80d3205b822f67c904bac1
Parents: b3d849d
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Tue Jul 15 20:57:28 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Tue Jul 15 20:57:40 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchConfigurator.java              |  28 +-
 .../ElasticsearchPersistUpdater.java            |   6 +-
 .../elasticsearch/PercolateProcessor.java       | 294 ---------------
 .../processor/PercolateTagProcessor.java        | 353 +++++++++++++++++++
 .../ElasticsearchWriterConfiguration.json       |  22 +-
 streams-pojo/pom.xml                            |   6 +
 .../org/apache/streams/data/util/JsonUtil.java  |  60 +++-
 7 files changed, 424 insertions(+), 345 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
index 26033de..1c66789 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
@@ -78,28 +78,14 @@ public class ElasticsearchConfigurator {
 
     public static ElasticsearchWriterConfiguration detectWriterConfiguration(Config elasticsearch) {
 
-        ElasticsearchConfiguration elasticsearchConfiguration = detectConfiguration(elasticsearch);
-        ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = mapper.convertValue(elasticsearchConfiguration, ElasticsearchWriterConfiguration.class);
-
-        String index = elasticsearch.getString("index");
-        String type = elasticsearch.getString("type");
-        Long maxMsBeforeFlush = elasticsearch.hasPath("MaxTimeBetweenFlushMs") ? elasticsearch.getLong("MaxTimeBetweenFlushMs") : null;
-
-        if( elasticsearch.hasPath("bulk"))
-            elasticsearchWriterConfiguration.setBulk(elasticsearch.getBoolean("bulk"));
-
-        if( elasticsearch.hasPath("batchSize"))
-            elasticsearchWriterConfiguration.setBatchSize(elasticsearch.getLong("batchSize"));
-
-        if( elasticsearch.hasPath("batchBytes"))
-            elasticsearchWriterConfiguration.setBatchBytes(elasticsearch.getLong("batchBytes"));
-
-
-        elasticsearchWriterConfiguration.setIndex(index);
-        elasticsearchWriterConfiguration.setType(type);
-        elasticsearchWriterConfiguration.setMaxTimeBetweenFlushMs(maxMsBeforeFlush);
-
+        ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = null;
 
+        try {
+            elasticsearchWriterConfiguration = mapper.readValue(elasticsearch.root().render(ConfigRenderOptions.concise()), ElasticsearchWriterConfiguration.class);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Could not parse elasticsearchwriterconfiguration");
+        }
         return elasticsearchWriterConfiguration;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index b2e7556..602172e 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -128,14 +128,14 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
 
     private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    private ElasticsearchConfiguration config;
+    private ElasticsearchWriterConfiguration config;
 
     public ElasticsearchPersistUpdater() {
         Config config = StreamsConfigurator.config.getConfig("elasticsearch");
-        this.config = ElasticsearchConfigurator.detectConfiguration(config);
+        this.config = ElasticsearchConfigurator.detectWriterConfiguration(config);
     }
 
-    public ElasticsearchPersistUpdater(ElasticsearchConfiguration config) {
+    public ElasticsearchPersistUpdater(ElasticsearchWriterConfiguration config) {
         this.config = config;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
deleted file mode 100644
index c5d9f4c..0000000
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.elasticsearch;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.data.util.ActivityUtil;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.percolate.PercolateResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.index.query.QueryStringQueryBuilder;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-/**
- * References:
- * Some helpful references to help
- * Purpose              URL
- * -------------        ----------------------------------------------------------------
- * [Status Codes]       http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
- * [Test Cases]         http://greenbytes.de/tech/tc/httpredirects/
- * [t.co behavior]      https://dev.twitter.com/docs/tco-redirection-behavior
- */
-
-public class PercolateProcessor implements StreamsProcessor {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(PercolateProcessor.class);
-
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-    protected Queue<StreamsDatum> inQueue;
-    protected Queue<StreamsDatum> outQueue;
-
-    public String TAGS_EXTENSION = "tags";
-
-    private ElasticsearchWriterConfiguration config;
-    private ElasticsearchClientManager manager;
-    private BulkRequestBuilder bulkBuilder;
-
-    public PercolateProcessor(ElasticsearchConfiguration config) {
-        manager = new ElasticsearchClientManager(config);
-    }
-
-    public ElasticsearchClientManager getManager() {
-        return manager;
-    }
-
-    public void setManager(ElasticsearchClientManager manager) {
-        this.manager = manager;
-    }
-
-    public ElasticsearchConfiguration getConfig() {
-        return config;
-    }
-
-    public void setConfig(ElasticsearchWriterConfiguration config) {
-        this.config = config;
-    }
-
-    public Queue<StreamsDatum> getProcessorOutputQueue() {
-        return outQueue;
-    }
-
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-
-        List<StreamsDatum> result = Lists.newArrayList();
-
-        String json;
-        ObjectNode node;
-        // first check for valid json
-        if (entry.getDocument() instanceof String) {
-            json = (String) entry.getDocument();
-            try {
-                node = (ObjectNode) mapper.readTree(json);
-            } catch (IOException e) {
-                e.printStackTrace();
-                return null;
-            }
-        } else {
-            node = (ObjectNode) entry.getDocument();
-            json = node.asText();
-        }
-
-        PercolateResponse response = manager.getClient().preparePercolate().setSource(json).execute().actionGet();
-
-        ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
-
-        for (PercolateResponse.Match match : response.getMatches()) {
-            tagArray.add(match.getId().string());
-        }
-
-        Activity activity = mapper.convertValue(node, Activity.class);
-
-        Map<String, Object> extensions = ActivityUtil.ensureExtensions(activity);
-
-        extensions.put(TAGS_EXTENSION, tagArray);
-
-        activity.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
-
-        result.add(entry);
-
-        return result;
-
-    }
-
-    @Override
-    public void prepare(Object o) {
-
-        Preconditions.checkNotNull(manager);
-        Preconditions.checkNotNull(manager.getClient());
-        Preconditions.checkNotNull(config);
-        Preconditions.checkNotNull(config.getTags());
-        Preconditions.checkArgument(config.getTags().size() > 0);
-
-        // consider using mapping to figure out what fields are included in _all
-        //manager.getClient().admin().indices().prepareGetMappings(config.getIndex()).get().getMappings().get(config.getType()).;
-
-        deleteOldQueries(config.getIndex());
-        for (Tag tag : config.getTags()) {
-            PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag.getId());
-            queryBuilder.addQuery(tag.getQuery(), FilterLevel.MUST, "_all");
-            addPercolateRule(queryBuilder, config.getIndex());
-        }
-        if (writePercolateRules() == true)
-            LOGGER.info("wrote " + bulkBuilder.numberOfActions() + " tags to _percolator");
-        else
-            LOGGER.error("FAILED writing " + bulkBuilder.numberOfActions() + " tags to _percolator");
-
-
-    }
-
-    @Override
-    public void cleanUp() {
-        manager.getClient().close();
-    }
-
-    public int numOfPercolateRules() {
-        return this.bulkBuilder.numberOfActions();
-    }
-
-    public void addPercolateRule(PercolateQueryBuilder builder, String index) {
-        this.bulkBuilder.add(manager.getClient().prepareIndex("_percolator", index, builder.getId()).setSource(builder.getSource()));
-    }
-
-    /**
-     *
-     * @return returns true if all rules were addded. False indicates one or more rules have failed.
-     */
-    public boolean writePercolateRules() {
-        if(this.numOfPercolateRules() < 0) {
-            throw new RuntimeException("No Rules Have been added!");
-        }
-        BulkResponse response = this.bulkBuilder.execute().actionGet();
-        for(BulkItemResponse r : response.getItems()) {
-            if(r.isFailed()) {
-                System.out.println(r.getId()+"\t"+r.getFailureMessage());
-            }
-        }
-        return !response.hasFailures();
-    }
-
-    /**
-     *
-     * @param ids
-     * @param index
-     * @return  Returns true if all of the old tags were removed. False indicates one or more tags were not removed.
-     */
-    public boolean removeOldTags(Set<String> ids, String index) {
-        if(ids.size() == 0) {
-            return false;
-        }
-        BulkRequestBuilder bulk = manager.getClient().prepareBulk();
-        for(String id : ids) {
-            bulk.add(manager.getClient().prepareDelete("_percolator", index, id));
-        }
-        return !bulk.execute().actionGet().hasFailures();
-    }
-
-    public Set<String> getActivePercolateTags(String index) {
-        Set<String> tags = new HashSet<String>();
-        SearchRequestBuilder searchBuilder = manager.getClient().prepareSearch("_percolator").setTypes(index).setSize(1000);
-        SearchResponse response = searchBuilder.setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
-        SearchHits hits = response.getHits();
-        for(SearchHit hit : hits.getHits()) {
-            tags.add(hit.id());
-        }
-        return tags;
-    }
-
-    /**
-     *
-     * @param index
-     * @return
-     */
-    public boolean deleteOldQueries(String index) {
-        Set<String> tags = getActivePercolateTags(index);
-        if(tags.size() == 0) {
-            LOGGER.warn("No active tags were found in _percolator for index : {}", index);
-            return false;
-        }
-        LOGGER.info("Deleting {} tags.", tags.size());
-        BulkRequestBuilder bulk = manager.getClient().prepareBulk();
-        for(String tag : tags) {
-            bulk.add(manager.getClient().prepareDelete("_percolator", index, tag));
-        }
-        BulkResponse response =bulk.execute().actionGet();
-        return !response.hasFailures();
-    }
-
-    public static class PercolateQueryBuilder {
-
-        private BoolQueryBuilder queryBuilder;
-        private String id;
-
-        public PercolateQueryBuilder(String id) {
-            this.id = id;
-            this.queryBuilder = QueryBuilders.boolQuery();
-        }
-
-        public void setMinumumNumberShouldMatch(int shouldMatch) {
-            this.queryBuilder.minimumNumberShouldMatch(shouldMatch);
-        }
-
-
-        public void addQuery(String query, FilterLevel level, String... fields) {
-            QueryStringQueryBuilder builder = QueryBuilders.queryString(query);
-            if(fields != null && fields.length > 0) {
-                for(String field : fields) {
-                    builder.field(field);
-                }
-            }
-            switch (level) {
-                case MUST:
-                    this.queryBuilder.must(builder);
-                    break;
-                case SHOULD:
-                    this.queryBuilder.should(builder);
-                    break;
-                case MUST_NOT:
-                    this.queryBuilder.mustNot(builder);
-            }
-        }
-
-        public String getId() {
-            return this.id;
-        }
-
-        public String getSource() {
-            return "{ \n\"query\" : "+this.queryBuilder.toString()+"\n}";
-        }
-
-
-    }
-
-    public enum FilterLevel {
-        MUST, SHOULD, MUST_NOT
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
new file mode 100644
index 0000000..075d2b2
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.elasticsearch.processor;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.percolate.PercolateRequestBuilder;
+import org.elasticsearch.action.percolate.PercolateResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.QueryStringQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * References:
+ * Some helpful references to help
+ * Purpose              URL
+ * -------------        ----------------------------------------------------------------
+ * [Status Codes]       http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
+ * [Test Cases]         http://greenbytes.de/tech/tc/httpredirects/
+ * [t.co behavior]      https://dev.twitter.com/docs/tco-redirection-behavior
+ */
+
+public class PercolateTagProcessor implements StreamsProcessor {
+
+    public static final String STREAMS_ID = "PercolateTagProcessor";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(PercolateTagProcessor.class);
+
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    protected Queue<StreamsDatum> inQueue;
+    protected Queue<StreamsDatum> outQueue;
+
+    public String TAGS_EXTENSION = "tags";
+
+    private ElasticsearchWriterConfiguration config;
+    private ElasticsearchClientManager manager;
+    private BulkRequestBuilder bulkBuilder;
+
+    public PercolateTagProcessor(ElasticsearchWriterConfiguration config) {
+        this.config = config;
+        manager = new ElasticsearchClientManager(config);
+    }
+
+    public ElasticsearchClientManager getManager() {
+        return manager;
+    }
+
+    public void setManager(ElasticsearchClientManager manager) {
+        this.manager = manager;
+    }
+
+    public ElasticsearchConfiguration getConfig() {
+        return config;
+    }
+
+    public void setConfig(ElasticsearchWriterConfiguration config) {
+        this.config = config;
+    }
+
+    public Queue<StreamsDatum> getProcessorOutputQueue() {
+        return outQueue;
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        List<StreamsDatum> result = Lists.newArrayList();
+
+        String json;
+        ObjectNode node;
+        // first check for valid json
+        if (entry.getDocument() instanceof String) {
+            json = (String) entry.getDocument();
+            try {
+                node = (ObjectNode) mapper.readTree(json);
+            } catch (IOException e) {
+                e.printStackTrace();
+                return null;
+            }
+        } else {
+            node = (ObjectNode) entry.getDocument();
+            try {
+                json = mapper.writeValueAsString(node);
+            } catch (JsonProcessingException e) {
+                LOGGER.warn("Invalid datum: ", node);
+                return null;
+            }
+        }
+
+        StringBuilder percolateRequestJson = new StringBuilder();
+        percolateRequestJson.append("{ \"doc\": ");
+        percolateRequestJson.append(json);
+        //percolateRequestJson.append("{ \"content\" : \"crazy good shit\" }");
+        percolateRequestJson.append("}");
+
+        PercolateRequestBuilder request;
+        PercolateResponse response;
+
+        try {
+            LOGGER.trace("Percolate request json: {}", percolateRequestJson.toString());
+            request = manager.getClient().preparePercolate().setIndices(config.getIndex()).setDocumentType(config.getType()).setSource(percolateRequestJson.toString());
+            LOGGER.trace("Percolate request: {}", mapper.writeValueAsString(request.request()));
+            response = request.execute().actionGet();
+            LOGGER.trace("Percolate response: {} matches", response.getMatches().length);
+        } catch (Exception e) {
+            LOGGER.warn("Percolate exception: {}", e.getMessage());
+            return null;
+        }
+
+        ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
+
+        Iterator<PercolateResponse.Match> matchIterator = response.iterator();
+        while(matchIterator.hasNext()) {
+            tagArray.add(matchIterator.next().getId().string());
+        }
+
+        LOGGER.trace("Percolate matches: {}", tagArray);
+
+        Activity activity = mapper.convertValue(node, Activity.class);
+
+        Map<String, Object> extensions = ActivityUtil.ensureExtensions(activity);
+
+        extensions.put(TAGS_EXTENSION, tagArray);
+
+        activity.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
+
+        entry.setDocument(activity);
+
+        result.add(entry);
+
+        return result;
+
+    }
+
+    @Override
+    public void prepare(Object o) {
+
+        Preconditions.checkNotNull(manager);
+        Preconditions.checkNotNull(manager.getClient());
+        Preconditions.checkNotNull(config);
+        Preconditions.checkNotNull(config.getTags());
+        Preconditions.checkArgument(config.getTags().getAdditionalProperties().size() > 0);
+
+        // consider using mapping to figure out what fields are included in _all
+        //manager.getClient().admin().indices().prepareGetMappings(config.getIndex()).get().getMappings().get(config.getType()).;
+
+        //deleteOldQueries(config.getIndex());
+        bulkBuilder = manager.getClient().prepareBulk();
+        for (String tag : config.getTags().getAdditionalProperties().keySet()) {
+            String query = (String)config.getTags().getAdditionalProperties().get(tag);
+            PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag, query);
+            addPercolateRule(queryBuilder, config.getIndex());
+        }
+        if (writePercolateRules() == true)
+            LOGGER.info("wrote " + bulkBuilder.numberOfActions() + " tags to " + config.getIndex() + " _percolator");
+        else
+            LOGGER.error("FAILED writing " + bulkBuilder.numberOfActions() + " tags to " + config.getIndex() + " _percolator");
+
+
+    }
+
+    @Override
+    public void cleanUp() {
+        deleteOldQueries(config.getIndex());
+        manager.getClient().close();
+    }
+
+    public int numOfPercolateRules() {
+        return this.bulkBuilder.numberOfActions();
+    }
+
+    public void addPercolateRule(PercolateQueryBuilder builder, String index) {
+        this.bulkBuilder.add(manager.getClient().prepareIndex(index, ".percolator", builder.getId())
+                .setSource(builder.getSource()));
+    }
+
+    /**
+     *
+     * @return returns true if all rules were addded. False indicates one or more rules have failed.
+     */
+    public boolean writePercolateRules() {
+        if(this.numOfPercolateRules() < 0) {
+            throw new RuntimeException("No Rules Have been added!");
+        }
+        BulkResponse response = this.bulkBuilder.execute().actionGet();
+        for(BulkItemResponse r : response.getItems()) {
+            if(r.isFailed()) {
+                System.out.println(r.getId()+"\t"+r.getFailureMessage());
+            }
+        }
+        return !response.hasFailures();
+    }
+
+    /**
+     *
+     * @param ids
+     * @param index
+     * @return  Returns true if all of the old tags were removed. False indicates one or more tags were not removed.
+     */
+    public boolean removeOldTags(Set<String> ids, String index) {
+        if(ids.size() == 0) {
+            return false;
+        }
+        BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+        for(String id : ids) {
+            bulk.add(manager.getClient().prepareDelete("_percolator", index, id));
+        }
+        return !bulk.execute().actionGet().hasFailures();
+    }
+
+    public Set<String> getActivePercolateTags(String index) {
+        Set<String> tags = new HashSet<String>();
+        SearchRequestBuilder searchBuilder = manager.getClient().prepareSearch("*").setIndices(index).setTypes(".percolator").setSize(1000);
+        SearchResponse response = searchBuilder.setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
+        SearchHits hits = response.getHits();
+        for(SearchHit hit : hits.getHits()) {
+            tags.add(hit.id());
+        }
+        return tags;
+    }
+
+    /**
+     *
+     * @param index
+     * @return
+     */
+    public boolean deleteOldQueries(String index) {
+        Set<String> tags = getActivePercolateTags(index);
+        if(tags.size() == 0) {
+            LOGGER.warn("No active tags were found in _percolator for index : {}", index);
+            return false;
+        }
+        LOGGER.info("Deleting {} tags.", tags.size());
+        BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+        for(String tag : tags) {
+            bulk.add(manager.getClient().prepareDelete().setType(".percolator").setIndex(index).setId(tag));
+        }
+        BulkResponse response =bulk.execute().actionGet();
+        return !response.hasFailures();
+    }
+
+//    public static class PercolateQueryBuilder {
+//
+//        private BoolQueryBuilder queryBuilder;
+//        private String id;
+//
+//        public PercolateQueryBuilder(String id) {
+//            this.id = id;
+//            this.queryBuilder = QueryBuilders.boolQuery();
+//        }
+//
+//        public void setMinumumNumberShouldMatch(int shouldMatch) {
+//            this.queryBuilder.minimumNumberShouldMatch(shouldMatch);
+//        }
+//
+//
+//        public void addQuery(String query, FilterLevel level, String... fields) {
+//            QueryStringQueryBuilder builder = QueryBuilders.queryString(query);
+//            if(fields != null && fields.length > 0) {
+//                for(String field : fields) {
+//                    builder.field(field);
+//                }
+//            }
+//            switch (level) {
+//                case MUST:
+//                    this.queryBuilder.must(builder);
+//                    break;
+//                case SHOULD:
+//                    this.queryBuilder.should(builder);
+//                    break;
+//                case MUST_NOT:
+//                    this.queryBuilder.mustNot(builder);
+//            }
+//        }
+//
+//        public String getId() {
+//            return this.id;
+//        }
+//
+//        public String getSource() {
+//            return "{ \n\"query\" : "+this.queryBuilder.toString()+"\n}";
+//        }
+//
+//
+//    }
+
+    public static class PercolateQueryBuilder {
+
+        private QueryStringQueryBuilder queryBuilder;
+        private String id;
+
+        public PercolateQueryBuilder(String id, String query) {
+            this.id = id;
+            this.queryBuilder = QueryBuilders.queryString(query);
+        }
+
+        public String getId() {
+            return this.id;
+        }
+
+        public String getSource() {
+            return "{ \n\"query\" : "+this.queryBuilder.toString()+"\n}";
+        }
+
+    }
+
+
+    public enum FilterLevel {
+        MUST, SHOULD, MUST_NOT
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index bf7b146..cd23fe2 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -32,23 +32,13 @@
 		"maxTimeBetweenFlushMs": {
 			"type": "integer"
 		},
+        "script": {
+            "type": "string",
+            "description": "Script to execute during index"
+        },
         "tags": {
-            "type": "array",
-            "description": "Tags to apply",
-            "items": {
-                "type": "object",
-                "description": "Tag to apply",
-                "properties": {
-                    "id": {
-                        "type": "string",
-                        "description": "Tag identifier"
-                    },
-                    "query": {
-                        "type": "string",
-                        "description": "Tag query"
-                    }
-                }
-            }
+            "type": "object",
+            "description": "Tags to apply during index"
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-pojo/pom.xml
----------------------------------------------------------------------
diff --git a/streams-pojo/pom.xml b/streams-pojo/pom.xml
index a3a12f6..ca6d953 100644
--- a/streams-pojo/pom.xml
+++ b/streams-pojo/pom.xml
@@ -85,6 +85,12 @@
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-pojo/src/main/java/org/apache/streams/data/util/JsonUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/util/JsonUtil.java b/streams-pojo/src/main/java/org/apache/streams/data/util/JsonUtil.java
index d49ef2a..79ac555 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/util/JsonUtil.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/util/JsonUtil.java
@@ -24,9 +24,15 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 
 import java.io.*;
-import java.util.List;
+import java.util.*;
 
 /**
  * JSON utilities
@@ -35,10 +41,10 @@ public class JsonUtil {
 
     private JsonUtil() {}
 
-    public static JsonNode jsonToJsonNode(String json) {
-        ObjectMapper mapper = new ObjectMapper();
-        JsonFactory factory = mapper.getFactory();
+    private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+    private static JsonFactory factory = mapper.getFactory();
 
+    public static JsonNode jsonToJsonNode(String json) {
         JsonNode node;
         try {
             JsonParser jp = factory.createJsonParser(json);
@@ -50,7 +56,6 @@ public class JsonUtil {
     }
 
     public static String jsonNodeToJson(JsonNode node) {
-        ObjectMapper mapper = new ObjectMapper();
         try {
             return mapper.writeValueAsString(node);
         } catch (JsonProcessingException e) {
@@ -59,7 +64,6 @@ public class JsonUtil {
     }
 
     public static <T> T jsonToObject(String json, Class<T> clazz) {
-        ObjectMapper mapper = new ObjectMapper();
         try {
             return mapper.readValue(json, clazz);
         } catch (IOException e) {
@@ -68,22 +72,18 @@ public class JsonUtil {
     }
 
     public static <T> T jsonNodeToObject(JsonNode node, Class<T> clazz) {
-        ObjectMapper mapper = new ObjectMapper();
         return mapper.convertValue(node, clazz);
     }
 
     public static <T> JsonNode objectToJsonNode(T obj) {
-        ObjectMapper mapper = new ObjectMapper();
         return mapper.valueToTree(obj);
     }
 
     public static <T> List<T> jsoNodeToList(JsonNode node, Class<T> clazz) {
-        ObjectMapper mapper = new ObjectMapper();
         return mapper.convertValue(node, new TypeReference<List<T>>() {});
     }
 
     public static <T> String objectToJson(T object) {
-        ObjectMapper mapper = new ObjectMapper();
         try {
             return mapper.writeValueAsString(object);
         } catch (IOException e) {
@@ -96,7 +96,6 @@ public class JsonUtil {
     }
 
     public static JsonNode getFromFile(String filePath) {
-        ObjectMapper mapper = new ObjectMapper();
         JsonFactory factory = mapper.getFactory(); // since 2.1 use mapper.getFactory() instead
 
         JsonNode node = null;
@@ -123,4 +122,43 @@ public class JsonUtil {
 
         return stream;
     }
+
+    /**
+     * Creates an empty array if missing
+     * @param node object to create the array within
+     * @param field location to create the array
+     * @return the Map representing the extensions property
+     */
+    public static ArrayNode ensureArray(ObjectNode node, String field) {
+        String[] path = Lists.newArrayList(Splitter.on('.').split(field)).toArray(new String[0]);
+        ObjectNode current = node;
+        ArrayNode result = null;
+        for( int i = 0; i < path.length; i++) {
+            current = ensureObject((ObjectNode) node.get(path[i]), path[i]);
+        }
+        if (current.get(field) == null)
+            current.put(field, mapper.createArrayNode());
+        result = (ArrayNode) node.get(field);
+        return result;
+    }
+
+    /**
+     * Creates an empty array if missing
+     * @param node objectnode to create the object within
+     * @param field location to create the object
+     * @return the Map representing the extensions property
+     */
+    public static ObjectNode ensureObject(ObjectNode node, String field) {
+        String[] path = Lists.newArrayList(Splitter.on('.').split(field)).toArray(new String[0]);
+        ObjectNode current = node;
+        ObjectNode result = null;
+        for( int i = 0; i < path.length; i++) {
+            if (node.get(field) == null)
+                node.put(field, mapper.createObjectNode());
+            current = (ObjectNode) node.get(field);
+        }
+        result = ensureObject((ObjectNode) node.get(path[path.length]), Joiner.on('.').join(Arrays.copyOfRange(path, 1, path.length)));
+        return result;
+    }
+
 }


[6/7] git commit: added explicit check for ObjectNode document type

Posted by sb...@apache.org.
added explicit check for ObjectNode document type


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/99dc5320
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/99dc5320
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/99dc5320

Branch: refs/heads/master
Commit: 99dc5320651ea6036be803adad2d295f0214d2b6
Parents: 767f945
Author: sblackmon <sb...@w2odigital.com>
Authored: Tue Aug 5 10:35:08 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Tue Aug 5 10:35:08 2014 -0500

----------------------------------------------------------------------
 .../streams/elasticsearch/processor/PercolateTagProcessor.java  | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/99dc5320/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
index eddc99b..389f390 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
@@ -122,7 +122,7 @@ public class PercolateTagProcessor implements StreamsProcessor {
                 e.printStackTrace();
                 return null;
             }
-        } else {
+        } else if (entry.getDocument() instanceof ObjectNode) {
             node = (ObjectNode) entry.getDocument();
             try {
                 json = mapper.writeValueAsString(node);
@@ -130,6 +130,9 @@ public class PercolateTagProcessor implements StreamsProcessor {
                 LOGGER.warn("Invalid datum: ", node);
                 return null;
             }
+        } else {
+            LOGGER.warn("Incompatible document type: ", entry.getDocument().getClass());
+            return null;
         }
 
         StringBuilder percolateRequestJson = new StringBuilder();


[7/7] git commit: Merge remote-tracking branch 'apache/STREAMS-134'

Posted by sb...@apache.org.
Merge remote-tracking branch 'apache/STREAMS-134'

Conflicts:
	streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a538352f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a538352f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a538352f

Branch: refs/heads/master
Commit: a538352fcccd8b3d43152b7e1547d1f69b9ac928
Parents: 8d9986a 99dc532
Author: sblackmon <sb...@w2odigital.com>
Authored: Fri Aug 8 14:56:47 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Fri Aug 8 14:56:47 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchConfigurator.java              |  28 +-
 .../elasticsearch/PercolateProcessor.java       | 169 --------
 .../processor/PercolateTagProcessor.java        | 387 +++++++++++++++++++
 .../ElasticsearchWriterConfiguration.json       |  25 +-
 streams-pojo/pom.xml                            |   6 +
 .../apache/streams/data/util/ActivityUtil.java  |   5 +
 .../org/apache/streams/data/util/JsonUtil.java  |  60 ++-
 7 files changed, 471 insertions(+), 209 deletions(-)
----------------------------------------------------------------------



[5/7] git commit: made processor serializable made config serializable

Posted by sb...@apache.org.
made processor serializable
made config serializable


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/767f9455
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/767f9455
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/767f9455

Branch: refs/heads/master
Commit: 767f945589727be131d3933132e9028f7cc3e008
Parents: 2f1bc42
Author: sblackmon <sb...@w2odigital.com>
Authored: Mon Aug 4 14:07:59 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon Aug 4 14:07:59 2014 -0500

----------------------------------------------------------------------
 .../processor/PercolateTagProcessor.java        | 51 ++++++++++++++++----
 .../ElasticsearchWriterConfiguration.json       | 17 ++++---
 2 files changed, 50 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/767f9455/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
index 075d2b2..eddc99b 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
@@ -33,9 +33,14 @@ import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
 import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.percolate.PercolateRequestBuilder;
 import org.elasticsearch.action.percolate.PercolateResponse;
 import org.elasticsearch.action.search.SearchRequestBuilder;
@@ -66,7 +71,7 @@ public class PercolateTagProcessor implements StreamsProcessor {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(PercolateTagProcessor.class);
 
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+    private ObjectMapper mapper;
 
     protected Queue<StreamsDatum> inQueue;
     protected Queue<StreamsDatum> outQueue;
@@ -79,7 +84,6 @@ public class PercolateTagProcessor implements StreamsProcessor {
 
     public PercolateTagProcessor(ElasticsearchWriterConfiguration config) {
         this.config = config;
-        manager = new ElasticsearchClientManager(config);
     }
 
     public ElasticsearchClientManager getManager() {
@@ -159,11 +163,7 @@ public class PercolateTagProcessor implements StreamsProcessor {
 
         Activity activity = mapper.convertValue(node, Activity.class);
 
-        Map<String, Object> extensions = ActivityUtil.ensureExtensions(activity);
-
-        extensions.put(TAGS_EXTENSION, tagArray);
-
-        activity.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
+        appendMatches(tagArray, activity);
 
         entry.setDocument(activity);
 
@@ -173,11 +173,17 @@ public class PercolateTagProcessor implements StreamsProcessor {
 
     }
 
+    protected void appendMatches(ArrayNode tagArray, Activity activity) {
+        Map<String, Object> extensions = ActivityUtil.ensureExtensions(activity);
+
+        extensions.put(TAGS_EXTENSION, tagArray);
+
+        activity.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
+    }
+
     @Override
     public void prepare(Object o) {
 
-        Preconditions.checkNotNull(manager);
-        Preconditions.checkNotNull(manager.getClient());
         Preconditions.checkNotNull(config);
         Preconditions.checkNotNull(config.getTags());
         Preconditions.checkArgument(config.getTags().getAdditionalProperties().size() > 0);
@@ -185,8 +191,11 @@ public class PercolateTagProcessor implements StreamsProcessor {
         // consider using mapping to figure out what fields are included in _all
         //manager.getClient().admin().indices().prepareGetMappings(config.getIndex()).get().getMappings().get(config.getType()).;
 
-        //deleteOldQueries(config.getIndex());
+        mapper = StreamsJacksonMapper.getInstance();
+        manager = new ElasticsearchClientManager(config);
         bulkBuilder = manager.getClient().prepareBulk();
+        createIndexIfMissing(config.getIndex());
+        deleteOldQueries(config.getIndex());
         for (String tag : config.getTags().getAdditionalProperties().keySet()) {
             String query = (String)config.getTags().getAdditionalProperties().get(tag);
             PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag, query);
@@ -210,6 +219,28 @@ public class PercolateTagProcessor implements StreamsProcessor {
         return this.bulkBuilder.numberOfActions();
     }
 
+    public void createIndexIfMissing(String indexName) {
+        if (!this.manager.getClient()
+                .admin()
+                .indices()
+                .exists(new IndicesExistsRequest(indexName))
+                .actionGet()
+                .isExists()) {
+            // It does not exist... So we are going to need to create the index.
+            // we are going to assume that the 'templates' that we have loaded into
+            // elasticsearch are sufficient to ensure the index is being created properly.
+            CreateIndexResponse response = this.manager.getClient().admin().indices().create(new CreateIndexRequest(indexName)).actionGet();
+
+            if (response.isAcknowledged()) {
+                LOGGER.info("Index {} did not exist. The index was automatically created from the stored ElasticSearch Templates.", indexName);
+            } else {
+                LOGGER.error("Index {} did not exist. While attempting to create the index from stored ElasticSearch Templates we were unable to get an acknowledgement.", indexName);
+                LOGGER.error("Error Message: {}", response.toString());
+                throw new RuntimeException("Unable to create index " + indexName);
+            }
+        }
+    }
+
     public void addPercolateRule(PercolateQueryBuilder builder, String index) {
         this.bulkBuilder.add(manager.getClient().prepareIndex(index, ".percolator", builder.getId())
                 .setSource(builder.getSource()));

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/767f9455/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index cd23fe2..26e3fa0 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -24,20 +24,21 @@
             "description": "Item Count before flush",
             "default": 100
         },
-		"batchBytes": {
-			"type": "integer",
-			"description": "Number of bytes before flush",
-			"default": 5242880
-		},
-		"maxTimeBetweenFlushMs": {
-			"type": "integer"
-		},
+        "batchBytes": {
+            "type": "integer",
+            "description": "Number of bytes before flush",
+            "default": 5242880
+        },
+        "maxTimeBetweenFlushMs": {
+            "type": "integer"
+        },
         "script": {
             "type": "string",
             "description": "Script to execute during index"
         },
         "tags": {
             "type": "object",
+            "javaInterfaces": ["java.io.Serializable"],
             "description": "Tags to apply during index"
         }
     }