You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/07/11 18:31:00 UTC
git commit: percolate implementation, untested
Repository: incubator-streams
Updated Branches:
refs/heads/percolater [created] 4e9a2bb57
percolate implementation, untested
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4e9a2bb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4e9a2bb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4e9a2bb5
Branch: refs/heads/percolater
Commit: 4e9a2bb5757ccb156fbe41756c41459f9d74e0fa
Parents: 38ed41a
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Fri Jul 11 00:49:51 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Fri Jul 11 00:49:51 2014 -0500
----------------------------------------------------------------------
.../elasticsearch/PercolateProcessor.java | 205 +++++++++++++++----
.../ElasticsearchWriterConfiguration.json | 20 +-
.../apache/streams/data/util/ActivityUtil.java | 5 +
3 files changed, 189 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e9a2bb5/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
index 9bc8d42..c5d9f4c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
@@ -18,25 +18,33 @@
package org.apache.streams.elasticsearch;
-import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.percolate.PercolateResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.QueryStringQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
/**
* References:
@@ -48,20 +56,23 @@ import java.util.concurrent.LinkedBlockingQueue;
* [t.co behavior] https://dev.twitter.com/docs/tco-redirection-behavior
*/
-public class PercolateProcessor implements StreamsProcessor, Runnable {
+public class PercolateProcessor implements StreamsProcessor {
+
private final static Logger LOGGER = LoggerFactory.getLogger(PercolateProcessor.class);
- private ObjectMapper mapper = new ObjectMapper();
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
protected Queue<StreamsDatum> inQueue;
protected Queue<StreamsDatum> outQueue;
+ public String TAGS_EXTENSION = "tags";
+
private ElasticsearchWriterConfiguration config;
private ElasticsearchClientManager manager;
+ private BulkRequestBuilder bulkBuilder;
- public PercolateProcessor(Queue<StreamsDatum> inQueue) {
- this.inQueue = inQueue;
- this.outQueue = new LinkedBlockingQueue<StreamsDatum>();
+ public PercolateProcessor(ElasticsearchConfiguration config) {
+ manager = new ElasticsearchClientManager(config);
}
public ElasticsearchClientManager getManager() {
@@ -72,7 +83,7 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
this.manager = manager;
}
- public ElasticsearchWriterConfiguration getConfig() {
+ public ElasticsearchConfiguration getConfig() {
return config;
}
@@ -80,16 +91,6 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
this.config = config;
}
- public void start() {
- Preconditions.checkNotNull(config);
- Preconditions.checkNotNull(manager);
- Preconditions.checkNotNull(manager.getClient());
- }
-
- public void stop() {
-
- }
-
public Queue<StreamsDatum> getProcessorOutputQueue() {
return outQueue;
}
@@ -115,19 +116,21 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
json = node.asText();
}
- PercolateResponse response = manager.getClient().preparePercolate().setDocumentType(config.getType()).setSource(json).execute().actionGet();
+ PercolateResponse response = manager.getClient().preparePercolate().setSource(json).execute().actionGet();
ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
for (PercolateResponse.Match match : response.getMatches()) {
tagArray.add(match.getId().string());
-
}
- // need utility methods for get / create specific node
- ObjectNode extensions = (ObjectNode) node.get("extensions");
- ObjectNode w2o = (ObjectNode) extensions.get("w2o");
- w2o.put("tags", tagArray);
+ Activity activity = mapper.convertValue(node, Activity.class);
+
+ Map<String, Object> extensions = ActivityUtil.ensureExtensions(activity);
+
+ extensions.put(TAGS_EXTENSION, tagArray);
+
+ activity.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
result.add(entry);
@@ -137,33 +140,155 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
@Override
public void prepare(Object o) {
- start();
+
+ Preconditions.checkNotNull(manager);
+ Preconditions.checkNotNull(manager.getClient());
+ Preconditions.checkNotNull(config);
+ Preconditions.checkNotNull(config.getTags());
+ Preconditions.checkArgument(config.getTags().size() > 0);
+
+ // consider using mapping to figure out what fields are included in _all
+ //manager.getClient().admin().indices().prepareGetMappings(config.getIndex()).get().getMappings().get(config.getType()).;
+
+ deleteOldQueries(config.getIndex());
+ for (Tag tag : config.getTags()) {
+ PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag.getId());
+ queryBuilder.addQuery(tag.getQuery(), FilterLevel.MUST, "_all");
+ addPercolateRule(queryBuilder, config.getIndex());
+ }
+ if (writePercolateRules() == true)
+ LOGGER.info("wrote " + bulkBuilder.numberOfActions() + " tags to _percolator");
+ else
+ LOGGER.error("FAILED writing " + bulkBuilder.numberOfActions() + " tags to _percolator");
+
+
}
@Override
public void cleanUp() {
- stop();
+ manager.getClient().close();
}
- @Override
- public void run() {
+ public int numOfPercolateRules() {
+ return this.bulkBuilder.numberOfActions();
+ }
- while (true) {
- StreamsDatum item;
- try {
- item = inQueue.poll();
+ public void addPercolateRule(PercolateQueryBuilder builder, String index) {
+ this.bulkBuilder.add(manager.getClient().prepareIndex("_percolator", index, builder.getId()).setSource(builder.getSource()));
+ }
- Thread.sleep(new Random().nextInt(100));
+ /**
+ *
+ * @return returns true if all rules were addded. False indicates one or more rules have failed.
+ */
+ public boolean writePercolateRules() {
+ if(this.numOfPercolateRules() < 0) {
+ throw new RuntimeException("No Rules Have been added!");
+ }
+ BulkResponse response = this.bulkBuilder.execute().actionGet();
+ for(BulkItemResponse r : response.getItems()) {
+ if(r.isFailed()) {
+ System.out.println(r.getId()+"\t"+r.getFailureMessage());
+ }
+ }
+ return !response.hasFailures();
+ }
- for (StreamsDatum entry : process(item)) {
- outQueue.offer(entry);
- }
+ /**
+ *
+ * @param ids
+ * @param index
+ * @return Returns true if all of the old tags were removed. False indicates one or more tags were not removed.
+ */
+ public boolean removeOldTags(Set<String> ids, String index) {
+ if(ids.size() == 0) {
+ return false;
+ }
+ BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+ for(String id : ids) {
+ bulk.add(manager.getClient().prepareDelete("_percolator", index, id));
+ }
+ return !bulk.execute().actionGet().hasFailures();
+ }
+ public Set<String> getActivePercolateTags(String index) {
+ Set<String> tags = new HashSet<String>();
+ SearchRequestBuilder searchBuilder = manager.getClient().prepareSearch("_percolator").setTypes(index).setSize(1000);
+ SearchResponse response = searchBuilder.setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
+ SearchHits hits = response.getHits();
+ for(SearchHit hit : hits.getHits()) {
+ tags.add(hit.id());
+ }
+ return tags;
+ }
- } catch (Exception e) {
- e.printStackTrace();
+ /**
+ *
+ * @param index
+ * @return
+ */
+ public boolean deleteOldQueries(String index) {
+ Set<String> tags = getActivePercolateTags(index);
+ if(tags.size() == 0) {
+ LOGGER.warn("No active tags were found in _percolator for index : {}", index);
+ return false;
+ }
+ LOGGER.info("Deleting {} tags.", tags.size());
+ BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+ for(String tag : tags) {
+ bulk.add(manager.getClient().prepareDelete("_percolator", index, tag));
+ }
+ BulkResponse response =bulk.execute().actionGet();
+ return !response.hasFailures();
+ }
+
+ public static class PercolateQueryBuilder {
+
+ private BoolQueryBuilder queryBuilder;
+ private String id;
+
+ public PercolateQueryBuilder(String id) {
+ this.id = id;
+ this.queryBuilder = QueryBuilders.boolQuery();
+ }
+ public void setMinumumNumberShouldMatch(int shouldMatch) {
+ this.queryBuilder.minimumNumberShouldMatch(shouldMatch);
+ }
+
+
+ public void addQuery(String query, FilterLevel level, String... fields) {
+ QueryStringQueryBuilder builder = QueryBuilders.queryString(query);
+ if(fields != null && fields.length > 0) {
+ for(String field : fields) {
+ builder.field(field);
+ }
}
+ switch (level) {
+ case MUST:
+ this.queryBuilder.must(builder);
+ break;
+ case SHOULD:
+ this.queryBuilder.should(builder);
+ break;
+ case MUST_NOT:
+ this.queryBuilder.mustNot(builder);
+ }
+ }
+
+ public String getId() {
+ return this.id;
}
+
+ public String getSource() {
+ return "{ \n\"query\" : "+this.queryBuilder.toString()+"\n}";
+ }
+
+
}
+
+ public enum FilterLevel {
+ MUST, SHOULD, MUST_NOT
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e9a2bb5/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index a38ff85..bf7b146 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -31,6 +31,24 @@
},
"maxTimeBetweenFlushMs": {
"type": "integer"
- }
+ },
+ "tags": {
+ "type": "array",
+ "description": "Tags to apply",
+ "items": {
+ "type": "object",
+ "description": "Tag to apply",
+ "properties": {
+ "id": {
+ "type": "string",
+ "description": "Tag identifier"
+ },
+ "query": {
+ "type": "string",
+ "description": "Tag query"
+ }
+ }
+ }
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e9a2bb5/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
index f1fcf44..3684b32 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
@@ -18,6 +18,8 @@
package org.apache.streams.data.util;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
import java.util.HashMap;
@@ -27,6 +29,7 @@ import java.util.Map;
* Utility class for managing activities
*/
public class ActivityUtil {
+
private ActivityUtil() {}
/**
@@ -58,6 +61,8 @@ public class ActivityUtil {
*/
public static final String LOCATION_EXTENSION_COORDINATES = "coordinates";
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
/**
* Creates a standard extension property
* @param activity activity to create the property in