You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/07/11 18:31:00 UTC

git commit: percolate implementation, untested

Repository: incubator-streams
Updated Branches:
  refs/heads/percolater [created] 4e9a2bb57


percolate implementation, untested


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4e9a2bb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4e9a2bb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4e9a2bb5

Branch: refs/heads/percolater
Commit: 4e9a2bb5757ccb156fbe41756c41459f9d74e0fa
Parents: 38ed41a
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Fri Jul 11 00:49:51 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Fri Jul 11 00:49:51 2014 -0500

----------------------------------------------------------------------
 .../elasticsearch/PercolateProcessor.java       | 205 +++++++++++++++----
 .../ElasticsearchWriterConfiguration.json       |  20 +-
 .../apache/streams/data/util/ActivityUtil.java  |   5 +
 3 files changed, 189 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e9a2bb5/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
index 9bc8d42..c5d9f4c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
@@ -18,25 +18,33 @@
 
 package org.apache.streams.elasticsearch;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.percolate.PercolateResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.QueryStringQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * References:
@@ -48,20 +56,23 @@ import java.util.concurrent.LinkedBlockingQueue;
  * [t.co behavior]      https://dev.twitter.com/docs/tco-redirection-behavior
  */
 
-public class PercolateProcessor implements StreamsProcessor, Runnable {
+public class PercolateProcessor implements StreamsProcessor {
+
     private final static Logger LOGGER = LoggerFactory.getLogger(PercolateProcessor.class);
 
-    private ObjectMapper mapper = new ObjectMapper();
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
     protected Queue<StreamsDatum> inQueue;
     protected Queue<StreamsDatum> outQueue;
 
+    public String TAGS_EXTENSION = "tags";
+
     private ElasticsearchWriterConfiguration config;
     private ElasticsearchClientManager manager;
+    private BulkRequestBuilder bulkBuilder;
 
-    public PercolateProcessor(Queue<StreamsDatum> inQueue) {
-        this.inQueue = inQueue;
-        this.outQueue = new LinkedBlockingQueue<StreamsDatum>();
+    public PercolateProcessor(ElasticsearchConfiguration config) {
+        manager = new ElasticsearchClientManager(config);
     }
 
     public ElasticsearchClientManager getManager() {
@@ -72,7 +83,7 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
         this.manager = manager;
     }
 
-    public ElasticsearchWriterConfiguration getConfig() {
+    public ElasticsearchConfiguration getConfig() {
         return config;
     }
 
@@ -80,16 +91,6 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
         this.config = config;
     }
 
-    public void start() {
-        Preconditions.checkNotNull(config);
-        Preconditions.checkNotNull(manager);
-        Preconditions.checkNotNull(manager.getClient());
-    }
-
-    public void stop() {
-
-    }
-
     public Queue<StreamsDatum> getProcessorOutputQueue() {
         return outQueue;
     }
@@ -115,19 +116,21 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
             json = node.asText();
         }
 
-        PercolateResponse response = manager.getClient().preparePercolate().setDocumentType(config.getType()).setSource(json).execute().actionGet();
+        PercolateResponse response = manager.getClient().preparePercolate().setSource(json).execute().actionGet();
 
         ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
 
         for (PercolateResponse.Match match : response.getMatches()) {
             tagArray.add(match.getId().string());
-
         }
 
-        // need utility methods for get / create specific node
-        ObjectNode extensions = (ObjectNode) node.get("extensions");
-        ObjectNode w2o = (ObjectNode) extensions.get("w2o");
-        w2o.put("tags", tagArray);
+        Activity activity = mapper.convertValue(node, Activity.class);
+
+        Map<String, Object> extensions = ActivityUtil.ensureExtensions(activity);
+
+        extensions.put(TAGS_EXTENSION, tagArray);
+
+        activity.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
 
         result.add(entry);
 
@@ -137,33 +140,155 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
 
     @Override
     public void prepare(Object o) {
-        start();
+
+        Preconditions.checkNotNull(manager);
+        Preconditions.checkNotNull(manager.getClient());
+        Preconditions.checkNotNull(config);
+        Preconditions.checkNotNull(config.getTags());
+        Preconditions.checkArgument(config.getTags().size() > 0);
+
+        // consider using mapping to figure out what fields are included in _all
+        //manager.getClient().admin().indices().prepareGetMappings(config.getIndex()).get().getMappings().get(config.getType()).;
+
+        deleteOldQueries(config.getIndex());
+        for (Tag tag : config.getTags()) {
+            PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag.getId());
+            queryBuilder.addQuery(tag.getQuery(), FilterLevel.MUST, "_all");
+            addPercolateRule(queryBuilder, config.getIndex());
+        }
+        if (writePercolateRules() == true)
+            LOGGER.info("wrote " + bulkBuilder.numberOfActions() + " tags to _percolator");
+        else
+            LOGGER.error("FAILED writing " + bulkBuilder.numberOfActions() + " tags to _percolator");
+
+
     }
 
     @Override
     public void cleanUp() {
-        stop();
+        manager.getClient().close();
     }
 
-    @Override
-    public void run() {
+    public int numOfPercolateRules() {
+        return this.bulkBuilder.numberOfActions();
+    }
 
-        while (true) {
-            StreamsDatum item;
-            try {
-                item = inQueue.poll();
+    public void addPercolateRule(PercolateQueryBuilder builder, String index) {
+        this.bulkBuilder.add(manager.getClient().prepareIndex("_percolator", index, builder.getId()).setSource(builder.getSource()));
+    }
 
-                Thread.sleep(new Random().nextInt(100));
+    /**
+     *
+     * @return returns true if all rules were addded. False indicates one or more rules have failed.
+     */
+    public boolean writePercolateRules() {
+        if(this.numOfPercolateRules() < 0) {
+            throw new RuntimeException("No Rules Have been added!");
+        }
+        BulkResponse response = this.bulkBuilder.execute().actionGet();
+        for(BulkItemResponse r : response.getItems()) {
+            if(r.isFailed()) {
+                System.out.println(r.getId()+"\t"+r.getFailureMessage());
+            }
+        }
+        return !response.hasFailures();
+    }
 
-                for (StreamsDatum entry : process(item)) {
-                    outQueue.offer(entry);
-                }
+    /**
+     *
+     * @param ids
+     * @param index
+     * @return  Returns true if all of the old tags were removed. False indicates one or more tags were not removed.
+     */
+    public boolean removeOldTags(Set<String> ids, String index) {
+        if(ids.size() == 0) {
+            return false;
+        }
+        BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+        for(String id : ids) {
+            bulk.add(manager.getClient().prepareDelete("_percolator", index, id));
+        }
+        return !bulk.execute().actionGet().hasFailures();
+    }
 
+    public Set<String> getActivePercolateTags(String index) {
+        Set<String> tags = new HashSet<String>();
+        SearchRequestBuilder searchBuilder = manager.getClient().prepareSearch("_percolator").setTypes(index).setSize(1000);
+        SearchResponse response = searchBuilder.setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
+        SearchHits hits = response.getHits();
+        for(SearchHit hit : hits.getHits()) {
+            tags.add(hit.id());
+        }
+        return tags;
+    }
 
-            } catch (Exception e) {
-                e.printStackTrace();
+    /**
+     *
+     * @param index
+     * @return
+     */
+    public boolean deleteOldQueries(String index) {
+        Set<String> tags = getActivePercolateTags(index);
+        if(tags.size() == 0) {
+            LOGGER.warn("No active tags were found in _percolator for index : {}", index);
+            return false;
+        }
+        LOGGER.info("Deleting {} tags.", tags.size());
+        BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+        for(String tag : tags) {
+            bulk.add(manager.getClient().prepareDelete("_percolator", index, tag));
+        }
+        BulkResponse response =bulk.execute().actionGet();
+        return !response.hasFailures();
+    }
+
+    public static class PercolateQueryBuilder {
+
+        private BoolQueryBuilder queryBuilder;
+        private String id;
+
+        public PercolateQueryBuilder(String id) {
+            this.id = id;
+            this.queryBuilder = QueryBuilders.boolQuery();
+        }
 
+        public void setMinumumNumberShouldMatch(int shouldMatch) {
+            this.queryBuilder.minimumNumberShouldMatch(shouldMatch);
+        }
+
+
+        public void addQuery(String query, FilterLevel level, String... fields) {
+            QueryStringQueryBuilder builder = QueryBuilders.queryString(query);
+            if(fields != null && fields.length > 0) {
+                for(String field : fields) {
+                    builder.field(field);
+                }
             }
+            switch (level) {
+                case MUST:
+                    this.queryBuilder.must(builder);
+                    break;
+                case SHOULD:
+                    this.queryBuilder.should(builder);
+                    break;
+                case MUST_NOT:
+                    this.queryBuilder.mustNot(builder);
+            }
+        }
+
+        public String getId() {
+            return this.id;
         }
+
+        public String getSource() {
+            return "{ \n\"query\" : "+this.queryBuilder.toString()+"\n}";
+        }
+
+
     }
+
+    public enum FilterLevel {
+        MUST, SHOULD, MUST_NOT
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e9a2bb5/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index a38ff85..bf7b146 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -31,6 +31,24 @@
 		},
 		"maxTimeBetweenFlushMs": {
 			"type": "integer"
-		}
+		},
+        "tags": {
+            "type": "array",
+            "description": "Tags to apply",
+            "items": {
+                "type": "object",
+                "description": "Tag to apply",
+                "properties": {
+                    "id": {
+                        "type": "string",
+                        "description": "Tag identifier"
+                    },
+                    "query": {
+                        "type": "string",
+                        "description": "Tag query"
+                    }
+                }
+            }
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e9a2bb5/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
index f1fcf44..3684b32 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
@@ -18,6 +18,8 @@
 
 package org.apache.streams.data.util;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 
 import java.util.HashMap;
@@ -27,6 +29,7 @@ import java.util.Map;
  * Utility class for managing activities
  */
 public class ActivityUtil {
+
     private ActivityUtil() {}
 
     /**
@@ -58,6 +61,8 @@ public class ActivityUtil {
      */
     public static final String LOCATION_EXTENSION_COORDINATES = "coordinates";
 
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
     /**
      * Creates a standard extension property
      * @param activity activity to create the property in