You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/08/08 21:57:16 UTC
[1/7] git commit: percolate implementation, untested
Repository: incubator-streams
Updated Branches:
refs/heads/master 8d9986af7 -> a538352fc
percolate implementation, untested
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4e9a2bb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4e9a2bb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4e9a2bb5
Branch: refs/heads/master
Commit: 4e9a2bb5757ccb156fbe41756c41459f9d74e0fa
Parents: 38ed41a
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Fri Jul 11 00:49:51 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Fri Jul 11 00:49:51 2014 -0500
----------------------------------------------------------------------
.../elasticsearch/PercolateProcessor.java | 205 +++++++++++++++----
.../ElasticsearchWriterConfiguration.json | 20 +-
.../apache/streams/data/util/ActivityUtil.java | 5 +
3 files changed, 189 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e9a2bb5/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
index 9bc8d42..c5d9f4c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
@@ -18,25 +18,33 @@
package org.apache.streams.elasticsearch;
-import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.percolate.PercolateResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.QueryStringQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
/**
* References:
@@ -48,20 +56,23 @@ import java.util.concurrent.LinkedBlockingQueue;
* [t.co behavior] https://dev.twitter.com/docs/tco-redirection-behavior
*/
-public class PercolateProcessor implements StreamsProcessor, Runnable {
+public class PercolateProcessor implements StreamsProcessor {
+
private final static Logger LOGGER = LoggerFactory.getLogger(PercolateProcessor.class);
- private ObjectMapper mapper = new ObjectMapper();
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
protected Queue<StreamsDatum> inQueue;
protected Queue<StreamsDatum> outQueue;
+ public String TAGS_EXTENSION = "tags";
+
private ElasticsearchWriterConfiguration config;
private ElasticsearchClientManager manager;
+ private BulkRequestBuilder bulkBuilder;
- public PercolateProcessor(Queue<StreamsDatum> inQueue) {
- this.inQueue = inQueue;
- this.outQueue = new LinkedBlockingQueue<StreamsDatum>();
+ public PercolateProcessor(ElasticsearchConfiguration config) {
+ manager = new ElasticsearchClientManager(config);
}
public ElasticsearchClientManager getManager() {
@@ -72,7 +83,7 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
this.manager = manager;
}
- public ElasticsearchWriterConfiguration getConfig() {
+ public ElasticsearchConfiguration getConfig() {
return config;
}
@@ -80,16 +91,6 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
this.config = config;
}
- public void start() {
- Preconditions.checkNotNull(config);
- Preconditions.checkNotNull(manager);
- Preconditions.checkNotNull(manager.getClient());
- }
-
- public void stop() {
-
- }
-
public Queue<StreamsDatum> getProcessorOutputQueue() {
return outQueue;
}
@@ -115,19 +116,21 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
json = node.asText();
}
- PercolateResponse response = manager.getClient().preparePercolate().setDocumentType(config.getType()).setSource(json).execute().actionGet();
+ PercolateResponse response = manager.getClient().preparePercolate().setSource(json).execute().actionGet();
ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
for (PercolateResponse.Match match : response.getMatches()) {
tagArray.add(match.getId().string());
-
}
- // need utility methods for get / create specific node
- ObjectNode extensions = (ObjectNode) node.get("extensions");
- ObjectNode w2o = (ObjectNode) extensions.get("w2o");
- w2o.put("tags", tagArray);
+ Activity activity = mapper.convertValue(node, Activity.class);
+
+ Map<String, Object> extensions = ActivityUtil.ensureExtensions(activity);
+
+ extensions.put(TAGS_EXTENSION, tagArray);
+
+ activity.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
result.add(entry);
@@ -137,33 +140,155 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
@Override
public void prepare(Object o) {
- start();
+
+ Preconditions.checkNotNull(manager);
+ Preconditions.checkNotNull(manager.getClient());
+ Preconditions.checkNotNull(config);
+ Preconditions.checkNotNull(config.getTags());
+ Preconditions.checkArgument(config.getTags().size() > 0);
+
+ // consider using mapping to figure out what fields are included in _all
+ //manager.getClient().admin().indices().prepareGetMappings(config.getIndex()).get().getMappings().get(config.getType()).;
+
+ deleteOldQueries(config.getIndex());
+ for (Tag tag : config.getTags()) {
+ PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag.getId());
+ queryBuilder.addQuery(tag.getQuery(), FilterLevel.MUST, "_all");
+ addPercolateRule(queryBuilder, config.getIndex());
+ }
+ if (writePercolateRules() == true)
+ LOGGER.info("wrote " + bulkBuilder.numberOfActions() + " tags to _percolator");
+ else
+ LOGGER.error("FAILED writing " + bulkBuilder.numberOfActions() + " tags to _percolator");
+
+
}
@Override
public void cleanUp() {
- stop();
+ manager.getClient().close();
}
- @Override
- public void run() {
+ public int numOfPercolateRules() {
+ return this.bulkBuilder.numberOfActions();
+ }
- while (true) {
- StreamsDatum item;
- try {
- item = inQueue.poll();
+ public void addPercolateRule(PercolateQueryBuilder builder, String index) {
+ this.bulkBuilder.add(manager.getClient().prepareIndex("_percolator", index, builder.getId()).setSource(builder.getSource()));
+ }
- Thread.sleep(new Random().nextInt(100));
+ /**
+ *
+ * @return returns true if all rules were addded. False indicates one or more rules have failed.
+ */
+ public boolean writePercolateRules() {
+ if(this.numOfPercolateRules() < 0) {
+ throw new RuntimeException("No Rules Have been added!");
+ }
+ BulkResponse response = this.bulkBuilder.execute().actionGet();
+ for(BulkItemResponse r : response.getItems()) {
+ if(r.isFailed()) {
+ System.out.println(r.getId()+"\t"+r.getFailureMessage());
+ }
+ }
+ return !response.hasFailures();
+ }
- for (StreamsDatum entry : process(item)) {
- outQueue.offer(entry);
- }
+ /**
+ *
+ * @param ids
+ * @param index
+ * @return Returns true if all of the old tags were removed. False indicates one or more tags were not removed.
+ */
+ public boolean removeOldTags(Set<String> ids, String index) {
+ if(ids.size() == 0) {
+ return false;
+ }
+ BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+ for(String id : ids) {
+ bulk.add(manager.getClient().prepareDelete("_percolator", index, id));
+ }
+ return !bulk.execute().actionGet().hasFailures();
+ }
+ public Set<String> getActivePercolateTags(String index) {
+ Set<String> tags = new HashSet<String>();
+ SearchRequestBuilder searchBuilder = manager.getClient().prepareSearch("_percolator").setTypes(index).setSize(1000);
+ SearchResponse response = searchBuilder.setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
+ SearchHits hits = response.getHits();
+ for(SearchHit hit : hits.getHits()) {
+ tags.add(hit.id());
+ }
+ return tags;
+ }
- } catch (Exception e) {
- e.printStackTrace();
+ /**
+ *
+ * @param index
+ * @return
+ */
+ public boolean deleteOldQueries(String index) {
+ Set<String> tags = getActivePercolateTags(index);
+ if(tags.size() == 0) {
+ LOGGER.warn("No active tags were found in _percolator for index : {}", index);
+ return false;
+ }
+ LOGGER.info("Deleting {} tags.", tags.size());
+ BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+ for(String tag : tags) {
+ bulk.add(manager.getClient().prepareDelete("_percolator", index, tag));
+ }
+ BulkResponse response =bulk.execute().actionGet();
+ return !response.hasFailures();
+ }
+
+ public static class PercolateQueryBuilder {
+
+ private BoolQueryBuilder queryBuilder;
+ private String id;
+
+ public PercolateQueryBuilder(String id) {
+ this.id = id;
+ this.queryBuilder = QueryBuilders.boolQuery();
+ }
+ public void setMinumumNumberShouldMatch(int shouldMatch) {
+ this.queryBuilder.minimumNumberShouldMatch(shouldMatch);
+ }
+
+
+ public void addQuery(String query, FilterLevel level, String... fields) {
+ QueryStringQueryBuilder builder = QueryBuilders.queryString(query);
+ if(fields != null && fields.length > 0) {
+ for(String field : fields) {
+ builder.field(field);
+ }
}
+ switch (level) {
+ case MUST:
+ this.queryBuilder.must(builder);
+ break;
+ case SHOULD:
+ this.queryBuilder.should(builder);
+ break;
+ case MUST_NOT:
+ this.queryBuilder.mustNot(builder);
+ }
+ }
+
+ public String getId() {
+ return this.id;
}
+
+ public String getSource() {
+ return "{ \n\"query\" : "+this.queryBuilder.toString()+"\n}";
+ }
+
+
}
+
+ public enum FilterLevel {
+ MUST, SHOULD, MUST_NOT
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e9a2bb5/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index a38ff85..bf7b146 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -31,6 +31,24 @@
},
"maxTimeBetweenFlushMs": {
"type": "integer"
- }
+ },
+ "tags": {
+ "type": "array",
+ "description": "Tags to apply",
+ "items": {
+ "type": "object",
+ "description": "Tag to apply",
+ "properties": {
+ "id": {
+ "type": "string",
+ "description": "Tag identifier"
+ },
+ "query": {
+ "type": "string",
+ "description": "Tag query"
+ }
+ }
+ }
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4e9a2bb5/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
index f1fcf44..3684b32 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
@@ -18,6 +18,8 @@
package org.apache.streams.data.util;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
import java.util.HashMap;
@@ -27,6 +29,7 @@ import java.util.Map;
* Utility class for managing activities
*/
public class ActivityUtil {
+
private ActivityUtil() {}
/**
@@ -58,6 +61,8 @@ public class ActivityUtil {
*/
public static final String LOCATION_EXTENSION_COORDINATES = "coordinates";
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
/**
* Creates a standard extension property
* @param activity activity to create the property in
[2/7] git commit: percolate implementation, untested
Posted by sb...@apache.org.
percolate implementation, untested
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b3d849d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b3d849d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b3d849d6
Branch: refs/heads/master
Commit: b3d849d65b4137c84f9ed12d86322d14758f9b3f
Parents: b7e4c34
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Fri Jul 11 00:49:51 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Tue Jul 15 20:57:39 2014 -0500
----------------------------------------------------------------------
.../elasticsearch/PercolateProcessor.java | 205 +++++++++++++++----
.../ElasticsearchWriterConfiguration.json | 20 +-
.../apache/streams/data/util/ActivityUtil.java | 5 +
3 files changed, 189 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b3d849d6/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
index 9bc8d42..c5d9f4c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
@@ -18,25 +18,33 @@
package org.apache.streams.elasticsearch;
-import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.percolate.PercolateResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.QueryStringQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
/**
* References:
@@ -48,20 +56,23 @@ import java.util.concurrent.LinkedBlockingQueue;
* [t.co behavior] https://dev.twitter.com/docs/tco-redirection-behavior
*/
-public class PercolateProcessor implements StreamsProcessor, Runnable {
+public class PercolateProcessor implements StreamsProcessor {
+
private final static Logger LOGGER = LoggerFactory.getLogger(PercolateProcessor.class);
- private ObjectMapper mapper = new ObjectMapper();
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
protected Queue<StreamsDatum> inQueue;
protected Queue<StreamsDatum> outQueue;
+ public String TAGS_EXTENSION = "tags";
+
private ElasticsearchWriterConfiguration config;
private ElasticsearchClientManager manager;
+ private BulkRequestBuilder bulkBuilder;
- public PercolateProcessor(Queue<StreamsDatum> inQueue) {
- this.inQueue = inQueue;
- this.outQueue = new LinkedBlockingQueue<StreamsDatum>();
+ public PercolateProcessor(ElasticsearchConfiguration config) {
+ manager = new ElasticsearchClientManager(config);
}
public ElasticsearchClientManager getManager() {
@@ -72,7 +83,7 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
this.manager = manager;
}
- public ElasticsearchWriterConfiguration getConfig() {
+ public ElasticsearchConfiguration getConfig() {
return config;
}
@@ -80,16 +91,6 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
this.config = config;
}
- public void start() {
- Preconditions.checkNotNull(config);
- Preconditions.checkNotNull(manager);
- Preconditions.checkNotNull(manager.getClient());
- }
-
- public void stop() {
-
- }
-
public Queue<StreamsDatum> getProcessorOutputQueue() {
return outQueue;
}
@@ -115,19 +116,21 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
json = node.asText();
}
- PercolateResponse response = manager.getClient().preparePercolate().setDocumentType(config.getType()).setSource(json).execute().actionGet();
+ PercolateResponse response = manager.getClient().preparePercolate().setSource(json).execute().actionGet();
ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
for (PercolateResponse.Match match : response.getMatches()) {
tagArray.add(match.getId().string());
-
}
- // need utility methods for get / create specific node
- ObjectNode extensions = (ObjectNode) node.get("extensions");
- ObjectNode w2o = (ObjectNode) extensions.get("w2o");
- w2o.put("tags", tagArray);
+ Activity activity = mapper.convertValue(node, Activity.class);
+
+ Map<String, Object> extensions = ActivityUtil.ensureExtensions(activity);
+
+ extensions.put(TAGS_EXTENSION, tagArray);
+
+ activity.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
result.add(entry);
@@ -137,33 +140,155 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
@Override
public void prepare(Object o) {
- start();
+
+ Preconditions.checkNotNull(manager);
+ Preconditions.checkNotNull(manager.getClient());
+ Preconditions.checkNotNull(config);
+ Preconditions.checkNotNull(config.getTags());
+ Preconditions.checkArgument(config.getTags().size() > 0);
+
+ // consider using mapping to figure out what fields are included in _all
+ //manager.getClient().admin().indices().prepareGetMappings(config.getIndex()).get().getMappings().get(config.getType()).;
+
+ deleteOldQueries(config.getIndex());
+ for (Tag tag : config.getTags()) {
+ PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag.getId());
+ queryBuilder.addQuery(tag.getQuery(), FilterLevel.MUST, "_all");
+ addPercolateRule(queryBuilder, config.getIndex());
+ }
+ if (writePercolateRules() == true)
+ LOGGER.info("wrote " + bulkBuilder.numberOfActions() + " tags to _percolator");
+ else
+ LOGGER.error("FAILED writing " + bulkBuilder.numberOfActions() + " tags to _percolator");
+
+
}
@Override
public void cleanUp() {
- stop();
+ manager.getClient().close();
}
- @Override
- public void run() {
+ public int numOfPercolateRules() {
+ return this.bulkBuilder.numberOfActions();
+ }
- while (true) {
- StreamsDatum item;
- try {
- item = inQueue.poll();
+ public void addPercolateRule(PercolateQueryBuilder builder, String index) {
+ this.bulkBuilder.add(manager.getClient().prepareIndex("_percolator", index, builder.getId()).setSource(builder.getSource()));
+ }
- Thread.sleep(new Random().nextInt(100));
+ /**
+ *
+ * @return returns true if all rules were addded. False indicates one or more rules have failed.
+ */
+ public boolean writePercolateRules() {
+ if(this.numOfPercolateRules() < 0) {
+ throw new RuntimeException("No Rules Have been added!");
+ }
+ BulkResponse response = this.bulkBuilder.execute().actionGet();
+ for(BulkItemResponse r : response.getItems()) {
+ if(r.isFailed()) {
+ System.out.println(r.getId()+"\t"+r.getFailureMessage());
+ }
+ }
+ return !response.hasFailures();
+ }
- for (StreamsDatum entry : process(item)) {
- outQueue.offer(entry);
- }
+ /**
+ *
+ * @param ids
+ * @param index
+ * @return Returns true if all of the old tags were removed. False indicates one or more tags were not removed.
+ */
+ public boolean removeOldTags(Set<String> ids, String index) {
+ if(ids.size() == 0) {
+ return false;
+ }
+ BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+ for(String id : ids) {
+ bulk.add(manager.getClient().prepareDelete("_percolator", index, id));
+ }
+ return !bulk.execute().actionGet().hasFailures();
+ }
+ public Set<String> getActivePercolateTags(String index) {
+ Set<String> tags = new HashSet<String>();
+ SearchRequestBuilder searchBuilder = manager.getClient().prepareSearch("_percolator").setTypes(index).setSize(1000);
+ SearchResponse response = searchBuilder.setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
+ SearchHits hits = response.getHits();
+ for(SearchHit hit : hits.getHits()) {
+ tags.add(hit.id());
+ }
+ return tags;
+ }
- } catch (Exception e) {
- e.printStackTrace();
+ /**
+ *
+ * @param index
+ * @return
+ */
+ public boolean deleteOldQueries(String index) {
+ Set<String> tags = getActivePercolateTags(index);
+ if(tags.size() == 0) {
+ LOGGER.warn("No active tags were found in _percolator for index : {}", index);
+ return false;
+ }
+ LOGGER.info("Deleting {} tags.", tags.size());
+ BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+ for(String tag : tags) {
+ bulk.add(manager.getClient().prepareDelete("_percolator", index, tag));
+ }
+ BulkResponse response =bulk.execute().actionGet();
+ return !response.hasFailures();
+ }
+
+ public static class PercolateQueryBuilder {
+
+ private BoolQueryBuilder queryBuilder;
+ private String id;
+
+ public PercolateQueryBuilder(String id) {
+ this.id = id;
+ this.queryBuilder = QueryBuilders.boolQuery();
+ }
+ public void setMinumumNumberShouldMatch(int shouldMatch) {
+ this.queryBuilder.minimumNumberShouldMatch(shouldMatch);
+ }
+
+
+ public void addQuery(String query, FilterLevel level, String... fields) {
+ QueryStringQueryBuilder builder = QueryBuilders.queryString(query);
+ if(fields != null && fields.length > 0) {
+ for(String field : fields) {
+ builder.field(field);
+ }
}
+ switch (level) {
+ case MUST:
+ this.queryBuilder.must(builder);
+ break;
+ case SHOULD:
+ this.queryBuilder.should(builder);
+ break;
+ case MUST_NOT:
+ this.queryBuilder.mustNot(builder);
+ }
+ }
+
+ public String getId() {
+ return this.id;
}
+
+ public String getSource() {
+ return "{ \n\"query\" : "+this.queryBuilder.toString()+"\n}";
+ }
+
+
}
+
+ public enum FilterLevel {
+ MUST, SHOULD, MUST_NOT
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b3d849d6/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index a38ff85..bf7b146 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -31,6 +31,24 @@
},
"maxTimeBetweenFlushMs": {
"type": "integer"
- }
+ },
+ "tags": {
+ "type": "array",
+ "description": "Tags to apply",
+ "items": {
+ "type": "object",
+ "description": "Tag to apply",
+ "properties": {
+ "id": {
+ "type": "string",
+ "description": "Tag identifier"
+ },
+ "query": {
+ "type": "string",
+ "description": "Tag query"
+ }
+ }
+ }
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b3d849d6/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
index f1fcf44..3684b32 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
@@ -18,6 +18,8 @@
package org.apache.streams.data.util;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
import java.util.HashMap;
@@ -27,6 +29,7 @@ import java.util.Map;
* Utility class for managing activities
*/
public class ActivityUtil {
+
private ActivityUtil() {}
/**
@@ -58,6 +61,8 @@ public class ActivityUtil {
*/
public static final String LOCATION_EXTENSION_COORDINATES = "coordinates";
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
/**
* Creates a standard extension property
* @param activity activity to create the property in
[4/7] git commit: Merge branch 'STREAMS-134' of
https://git-wip-us.apache.org/repos/asf/incubator-streams into STREAMS-134
Posted by sb...@apache.org.
Merge branch 'STREAMS-134' of https://git-wip-us.apache.org/repos/asf/incubator-streams into STREAMS-134
Conflicts:
streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2f1bc422
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2f1bc422
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2f1bc422
Branch: refs/heads/master
Commit: 2f1bc422612d3d6030a48bd956ba9b0c1994855c
Parents: 7df265f 4e9a2bb
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Tue Jul 15 21:00:11 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Tue Jul 15 21:00:11 2014 -0500
----------------------------------------------------------------------
----------------------------------------------------------------------
[3/7] git commit: percolate processor - tested with
streams-examples/elasticsearch-tag
Posted by sb...@apache.org.
percolate processor - tested with streams-examples/elasticsearch-tag
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7df265f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7df265f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7df265f5
Branch: refs/heads/master
Commit: 7df265f5436d167c0d80d3205b822f67c904bac1
Parents: b3d849d
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Tue Jul 15 20:57:28 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Tue Jul 15 20:57:40 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchConfigurator.java | 28 +-
.../ElasticsearchPersistUpdater.java | 6 +-
.../elasticsearch/PercolateProcessor.java | 294 ---------------
.../processor/PercolateTagProcessor.java | 353 +++++++++++++++++++
.../ElasticsearchWriterConfiguration.json | 22 +-
streams-pojo/pom.xml | 6 +
.../org/apache/streams/data/util/JsonUtil.java | 60 +++-
7 files changed, 424 insertions(+), 345 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
index 26033de..1c66789 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
@@ -78,28 +78,14 @@ public class ElasticsearchConfigurator {
public static ElasticsearchWriterConfiguration detectWriterConfiguration(Config elasticsearch) {
- ElasticsearchConfiguration elasticsearchConfiguration = detectConfiguration(elasticsearch);
- ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = mapper.convertValue(elasticsearchConfiguration, ElasticsearchWriterConfiguration.class);
-
- String index = elasticsearch.getString("index");
- String type = elasticsearch.getString("type");
- Long maxMsBeforeFlush = elasticsearch.hasPath("MaxTimeBetweenFlushMs") ? elasticsearch.getLong("MaxTimeBetweenFlushMs") : null;
-
- if( elasticsearch.hasPath("bulk"))
- elasticsearchWriterConfiguration.setBulk(elasticsearch.getBoolean("bulk"));
-
- if( elasticsearch.hasPath("batchSize"))
- elasticsearchWriterConfiguration.setBatchSize(elasticsearch.getLong("batchSize"));
-
- if( elasticsearch.hasPath("batchBytes"))
- elasticsearchWriterConfiguration.setBatchBytes(elasticsearch.getLong("batchBytes"));
-
-
- elasticsearchWriterConfiguration.setIndex(index);
- elasticsearchWriterConfiguration.setType(type);
- elasticsearchWriterConfiguration.setMaxTimeBetweenFlushMs(maxMsBeforeFlush);
-
+ ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = null;
+ try {
+ elasticsearchWriterConfiguration = mapper.readValue(elasticsearch.root().render(ConfigRenderOptions.concise()), ElasticsearchWriterConfiguration.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn("Could not parse elasticsearchwriterconfiguration");
+ }
return elasticsearchWriterConfiguration;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index b2e7556..602172e 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -128,14 +128,14 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
- private ElasticsearchConfiguration config;
+ private ElasticsearchWriterConfiguration config;
public ElasticsearchPersistUpdater() {
Config config = StreamsConfigurator.config.getConfig("elasticsearch");
- this.config = ElasticsearchConfigurator.detectConfiguration(config);
+ this.config = ElasticsearchConfigurator.detectWriterConfiguration(config);
}
- public ElasticsearchPersistUpdater(ElasticsearchConfiguration config) {
+ public ElasticsearchPersistUpdater(ElasticsearchWriterConfiguration config) {
this.config = config;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
deleted file mode 100644
index c5d9f4c..0000000
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.elasticsearch;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.data.util.ActivityUtil;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.percolate.PercolateResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.index.query.QueryStringQueryBuilder;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-/**
- * References:
- * Some helpful references to help
- * Purpose URL
- * ------------- ----------------------------------------------------------------
- * [Status Codes] http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
- * [Test Cases] http://greenbytes.de/tech/tc/httpredirects/
- * [t.co behavior] https://dev.twitter.com/docs/tco-redirection-behavior
- */
-
-public class PercolateProcessor implements StreamsProcessor {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(PercolateProcessor.class);
-
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
- protected Queue<StreamsDatum> inQueue;
- protected Queue<StreamsDatum> outQueue;
-
- public String TAGS_EXTENSION = "tags";
-
- private ElasticsearchWriterConfiguration config;
- private ElasticsearchClientManager manager;
- private BulkRequestBuilder bulkBuilder;
-
- public PercolateProcessor(ElasticsearchConfiguration config) {
- manager = new ElasticsearchClientManager(config);
- }
-
- public ElasticsearchClientManager getManager() {
- return manager;
- }
-
- public void setManager(ElasticsearchClientManager manager) {
- this.manager = manager;
- }
-
- public ElasticsearchConfiguration getConfig() {
- return config;
- }
-
- public void setConfig(ElasticsearchWriterConfiguration config) {
- this.config = config;
- }
-
- public Queue<StreamsDatum> getProcessorOutputQueue() {
- return outQueue;
- }
-
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
-
- List<StreamsDatum> result = Lists.newArrayList();
-
- String json;
- ObjectNode node;
- // first check for valid json
- if (entry.getDocument() instanceof String) {
- json = (String) entry.getDocument();
- try {
- node = (ObjectNode) mapper.readTree(json);
- } catch (IOException e) {
- e.printStackTrace();
- return null;
- }
- } else {
- node = (ObjectNode) entry.getDocument();
- json = node.asText();
- }
-
- PercolateResponse response = manager.getClient().preparePercolate().setSource(json).execute().actionGet();
-
- ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
-
- for (PercolateResponse.Match match : response.getMatches()) {
- tagArray.add(match.getId().string());
- }
-
- Activity activity = mapper.convertValue(node, Activity.class);
-
- Map<String, Object> extensions = ActivityUtil.ensureExtensions(activity);
-
- extensions.put(TAGS_EXTENSION, tagArray);
-
- activity.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
-
- result.add(entry);
-
- return result;
-
- }
-
- @Override
- public void prepare(Object o) {
-
- Preconditions.checkNotNull(manager);
- Preconditions.checkNotNull(manager.getClient());
- Preconditions.checkNotNull(config);
- Preconditions.checkNotNull(config.getTags());
- Preconditions.checkArgument(config.getTags().size() > 0);
-
- // consider using mapping to figure out what fields are included in _all
- //manager.getClient().admin().indices().prepareGetMappings(config.getIndex()).get().getMappings().get(config.getType()).;
-
- deleteOldQueries(config.getIndex());
- for (Tag tag : config.getTags()) {
- PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag.getId());
- queryBuilder.addQuery(tag.getQuery(), FilterLevel.MUST, "_all");
- addPercolateRule(queryBuilder, config.getIndex());
- }
- if (writePercolateRules() == true)
- LOGGER.info("wrote " + bulkBuilder.numberOfActions() + " tags to _percolator");
- else
- LOGGER.error("FAILED writing " + bulkBuilder.numberOfActions() + " tags to _percolator");
-
-
- }
-
- @Override
- public void cleanUp() {
- manager.getClient().close();
- }
-
- public int numOfPercolateRules() {
- return this.bulkBuilder.numberOfActions();
- }
-
- public void addPercolateRule(PercolateQueryBuilder builder, String index) {
- this.bulkBuilder.add(manager.getClient().prepareIndex("_percolator", index, builder.getId()).setSource(builder.getSource()));
- }
-
- /**
- *
- * @return returns true if all rules were addded. False indicates one or more rules have failed.
- */
- public boolean writePercolateRules() {
- if(this.numOfPercolateRules() < 0) {
- throw new RuntimeException("No Rules Have been added!");
- }
- BulkResponse response = this.bulkBuilder.execute().actionGet();
- for(BulkItemResponse r : response.getItems()) {
- if(r.isFailed()) {
- System.out.println(r.getId()+"\t"+r.getFailureMessage());
- }
- }
- return !response.hasFailures();
- }
-
- /**
- *
- * @param ids
- * @param index
- * @return Returns true if all of the old tags were removed. False indicates one or more tags were not removed.
- */
- public boolean removeOldTags(Set<String> ids, String index) {
- if(ids.size() == 0) {
- return false;
- }
- BulkRequestBuilder bulk = manager.getClient().prepareBulk();
- for(String id : ids) {
- bulk.add(manager.getClient().prepareDelete("_percolator", index, id));
- }
- return !bulk.execute().actionGet().hasFailures();
- }
-
- public Set<String> getActivePercolateTags(String index) {
- Set<String> tags = new HashSet<String>();
- SearchRequestBuilder searchBuilder = manager.getClient().prepareSearch("_percolator").setTypes(index).setSize(1000);
- SearchResponse response = searchBuilder.setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
- SearchHits hits = response.getHits();
- for(SearchHit hit : hits.getHits()) {
- tags.add(hit.id());
- }
- return tags;
- }
-
- /**
- *
- * @param index
- * @return
- */
- public boolean deleteOldQueries(String index) {
- Set<String> tags = getActivePercolateTags(index);
- if(tags.size() == 0) {
- LOGGER.warn("No active tags were found in _percolator for index : {}", index);
- return false;
- }
- LOGGER.info("Deleting {} tags.", tags.size());
- BulkRequestBuilder bulk = manager.getClient().prepareBulk();
- for(String tag : tags) {
- bulk.add(manager.getClient().prepareDelete("_percolator", index, tag));
- }
- BulkResponse response =bulk.execute().actionGet();
- return !response.hasFailures();
- }
-
- public static class PercolateQueryBuilder {
-
- private BoolQueryBuilder queryBuilder;
- private String id;
-
- public PercolateQueryBuilder(String id) {
- this.id = id;
- this.queryBuilder = QueryBuilders.boolQuery();
- }
-
- public void setMinumumNumberShouldMatch(int shouldMatch) {
- this.queryBuilder.minimumNumberShouldMatch(shouldMatch);
- }
-
-
- public void addQuery(String query, FilterLevel level, String... fields) {
- QueryStringQueryBuilder builder = QueryBuilders.queryString(query);
- if(fields != null && fields.length > 0) {
- for(String field : fields) {
- builder.field(field);
- }
- }
- switch (level) {
- case MUST:
- this.queryBuilder.must(builder);
- break;
- case SHOULD:
- this.queryBuilder.should(builder);
- break;
- case MUST_NOT:
- this.queryBuilder.mustNot(builder);
- }
- }
-
- public String getId() {
- return this.id;
- }
-
- public String getSource() {
- return "{ \n\"query\" : "+this.queryBuilder.toString()+"\n}";
- }
-
-
- }
-
- public enum FilterLevel {
- MUST, SHOULD, MUST_NOT
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
new file mode 100644
index 0000000..075d2b2
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.elasticsearch.processor;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.percolate.PercolateRequestBuilder;
+import org.elasticsearch.action.percolate.PercolateResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.QueryStringQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * References:
+ * Some helpful references to help
+ * Purpose URL
+ * ------------- ----------------------------------------------------------------
+ * [Status Codes] http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
+ * [Test Cases] http://greenbytes.de/tech/tc/httpredirects/
+ * [t.co behavior] https://dev.twitter.com/docs/tco-redirection-behavior
+ */
+
+public class PercolateTagProcessor implements StreamsProcessor {
+
+ public static final String STREAMS_ID = "PercolateTagProcessor";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(PercolateTagProcessor.class);
+
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ protected Queue<StreamsDatum> inQueue;
+ protected Queue<StreamsDatum> outQueue;
+
+ public String TAGS_EXTENSION = "tags";
+
+ private ElasticsearchWriterConfiguration config;
+ private ElasticsearchClientManager manager;
+ private BulkRequestBuilder bulkBuilder;
+
+ public PercolateTagProcessor(ElasticsearchWriterConfiguration config) {
+ this.config = config;
+ manager = new ElasticsearchClientManager(config);
+ }
+
+ public ElasticsearchClientManager getManager() {
+ return manager;
+ }
+
+ public void setManager(ElasticsearchClientManager manager) {
+ this.manager = manager;
+ }
+
+ public ElasticsearchConfiguration getConfig() {
+ return config;
+ }
+
+ public void setConfig(ElasticsearchWriterConfiguration config) {
+ this.config = config;
+ }
+
+ public Queue<StreamsDatum> getProcessorOutputQueue() {
+ return outQueue;
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ List<StreamsDatum> result = Lists.newArrayList();
+
+ String json;
+ ObjectNode node;
+ // first check for valid json
+ if (entry.getDocument() instanceof String) {
+ json = (String) entry.getDocument();
+ try {
+ node = (ObjectNode) mapper.readTree(json);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+ } else {
+ node = (ObjectNode) entry.getDocument();
+ try {
+ json = mapper.writeValueAsString(node);
+ } catch (JsonProcessingException e) {
+ LOGGER.warn("Invalid datum: ", node);
+ return null;
+ }
+ }
+
+ StringBuilder percolateRequestJson = new StringBuilder();
+ percolateRequestJson.append("{ \"doc\": ");
+ percolateRequestJson.append(json);
+ //percolateRequestJson.append("{ \"content\" : \"crazy good shit\" }");
+ percolateRequestJson.append("}");
+
+ PercolateRequestBuilder request;
+ PercolateResponse response;
+
+ try {
+ LOGGER.trace("Percolate request json: {}", percolateRequestJson.toString());
+ request = manager.getClient().preparePercolate().setIndices(config.getIndex()).setDocumentType(config.getType()).setSource(percolateRequestJson.toString());
+ LOGGER.trace("Percolate request: {}", mapper.writeValueAsString(request.request()));
+ response = request.execute().actionGet();
+ LOGGER.trace("Percolate response: {} matches", response.getMatches().length);
+ } catch (Exception e) {
+ LOGGER.warn("Percolate exception: {}", e.getMessage());
+ return null;
+ }
+
+ ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
+
+ Iterator<PercolateResponse.Match> matchIterator = response.iterator();
+ while(matchIterator.hasNext()) {
+ tagArray.add(matchIterator.next().getId().string());
+ }
+
+ LOGGER.trace("Percolate matches: {}", tagArray);
+
+ Activity activity = mapper.convertValue(node, Activity.class);
+
+ Map<String, Object> extensions = ActivityUtil.ensureExtensions(activity);
+
+ extensions.put(TAGS_EXTENSION, tagArray);
+
+ activity.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
+
+ entry.setDocument(activity);
+
+ result.add(entry);
+
+ return result;
+
+ }
+
+ @Override
+ public void prepare(Object o) {
+
+ Preconditions.checkNotNull(manager);
+ Preconditions.checkNotNull(manager.getClient());
+ Preconditions.checkNotNull(config);
+ Preconditions.checkNotNull(config.getTags());
+ Preconditions.checkArgument(config.getTags().getAdditionalProperties().size() > 0);
+
+ // consider using mapping to figure out what fields are included in _all
+ //manager.getClient().admin().indices().prepareGetMappings(config.getIndex()).get().getMappings().get(config.getType()).;
+
+ //deleteOldQueries(config.getIndex());
+ bulkBuilder = manager.getClient().prepareBulk();
+ for (String tag : config.getTags().getAdditionalProperties().keySet()) {
+ String query = (String)config.getTags().getAdditionalProperties().get(tag);
+ PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag, query);
+ addPercolateRule(queryBuilder, config.getIndex());
+ }
+ if (writePercolateRules() == true)
+ LOGGER.info("wrote " + bulkBuilder.numberOfActions() + " tags to " + config.getIndex() + " _percolator");
+ else
+ LOGGER.error("FAILED writing " + bulkBuilder.numberOfActions() + " tags to " + config.getIndex() + " _percolator");
+
+
+ }
+
+ @Override
+ public void cleanUp() {
+ deleteOldQueries(config.getIndex());
+ manager.getClient().close();
+ }
+
+ public int numOfPercolateRules() {
+ return this.bulkBuilder.numberOfActions();
+ }
+
+ public void addPercolateRule(PercolateQueryBuilder builder, String index) {
+ this.bulkBuilder.add(manager.getClient().prepareIndex(index, ".percolator", builder.getId())
+ .setSource(builder.getSource()));
+ }
+
+ /**
+ *
+ * @return returns true if all rules were addded. False indicates one or more rules have failed.
+ */
+ public boolean writePercolateRules() {
+ if(this.numOfPercolateRules() < 0) {
+ throw new RuntimeException("No Rules Have been added!");
+ }
+ BulkResponse response = this.bulkBuilder.execute().actionGet();
+ for(BulkItemResponse r : response.getItems()) {
+ if(r.isFailed()) {
+ System.out.println(r.getId()+"\t"+r.getFailureMessage());
+ }
+ }
+ return !response.hasFailures();
+ }
+
+ /**
+ *
+ * @param ids
+ * @param index
+ * @return Returns true if all of the old tags were removed. False indicates one or more tags were not removed.
+ */
+ public boolean removeOldTags(Set<String> ids, String index) {
+ if(ids.size() == 0) {
+ return false;
+ }
+ BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+ for(String id : ids) {
+ bulk.add(manager.getClient().prepareDelete("_percolator", index, id));
+ }
+ return !bulk.execute().actionGet().hasFailures();
+ }
+
+ public Set<String> getActivePercolateTags(String index) {
+ Set<String> tags = new HashSet<String>();
+ SearchRequestBuilder searchBuilder = manager.getClient().prepareSearch("*").setIndices(index).setTypes(".percolator").setSize(1000);
+ SearchResponse response = searchBuilder.setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
+ SearchHits hits = response.getHits();
+ for(SearchHit hit : hits.getHits()) {
+ tags.add(hit.id());
+ }
+ return tags;
+ }
+
+ /**
+ *
+ * @param index
+ * @return
+ */
+ public boolean deleteOldQueries(String index) {
+ Set<String> tags = getActivePercolateTags(index);
+ if(tags.size() == 0) {
+ LOGGER.warn("No active tags were found in _percolator for index : {}", index);
+ return false;
+ }
+ LOGGER.info("Deleting {} tags.", tags.size());
+ BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+ for(String tag : tags) {
+ bulk.add(manager.getClient().prepareDelete().setType(".percolator").setIndex(index).setId(tag));
+ }
+ BulkResponse response =bulk.execute().actionGet();
+ return !response.hasFailures();
+ }
+
+// public static class PercolateQueryBuilder {
+//
+// private BoolQueryBuilder queryBuilder;
+// private String id;
+//
+// public PercolateQueryBuilder(String id) {
+// this.id = id;
+// this.queryBuilder = QueryBuilders.boolQuery();
+// }
+//
+// public void setMinumumNumberShouldMatch(int shouldMatch) {
+// this.queryBuilder.minimumNumberShouldMatch(shouldMatch);
+// }
+//
+//
+// public void addQuery(String query, FilterLevel level, String... fields) {
+// QueryStringQueryBuilder builder = QueryBuilders.queryString(query);
+// if(fields != null && fields.length > 0) {
+// for(String field : fields) {
+// builder.field(field);
+// }
+// }
+// switch (level) {
+// case MUST:
+// this.queryBuilder.must(builder);
+// break;
+// case SHOULD:
+// this.queryBuilder.should(builder);
+// break;
+// case MUST_NOT:
+// this.queryBuilder.mustNot(builder);
+// }
+// }
+//
+// public String getId() {
+// return this.id;
+// }
+//
+// public String getSource() {
+// return "{ \n\"query\" : "+this.queryBuilder.toString()+"\n}";
+// }
+//
+//
+// }
+
+ public static class PercolateQueryBuilder {
+
+ private QueryStringQueryBuilder queryBuilder;
+ private String id;
+
+ public PercolateQueryBuilder(String id, String query) {
+ this.id = id;
+ this.queryBuilder = QueryBuilders.queryString(query);
+ }
+
+ public String getId() {
+ return this.id;
+ }
+
+ public String getSource() {
+ return "{ \n\"query\" : "+this.queryBuilder.toString()+"\n}";
+ }
+
+ }
+
+
+ public enum FilterLevel {
+ MUST, SHOULD, MUST_NOT
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index bf7b146..cd23fe2 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -32,23 +32,13 @@
"maxTimeBetweenFlushMs": {
"type": "integer"
},
+ "script": {
+ "type": "string",
+ "description": "Script to execute during index"
+ },
"tags": {
- "type": "array",
- "description": "Tags to apply",
- "items": {
- "type": "object",
- "description": "Tag to apply",
- "properties": {
- "id": {
- "type": "string",
- "description": "Tag identifier"
- },
- "query": {
- "type": "string",
- "description": "Tag query"
- }
- }
- }
+ "type": "object",
+ "description": "Tags to apply during index"
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-pojo/pom.xml
----------------------------------------------------------------------
diff --git a/streams-pojo/pom.xml b/streams-pojo/pom.xml
index a3a12f6..ca6d953 100644
--- a/streams-pojo/pom.xml
+++ b/streams-pojo/pom.xml
@@ -85,6 +85,12 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-pojo/src/main/java/org/apache/streams/data/util/JsonUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/util/JsonUtil.java b/streams-pojo/src/main/java/org/apache/streams/data/util/JsonUtil.java
index d49ef2a..79ac555 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/util/JsonUtil.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/util/JsonUtil.java
@@ -24,9 +24,15 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import java.io.*;
-import java.util.List;
+import java.util.*;
/**
* JSON utilities
@@ -35,10 +41,10 @@ public class JsonUtil {
private JsonUtil() {}
- public static JsonNode jsonToJsonNode(String json) {
- ObjectMapper mapper = new ObjectMapper();
- JsonFactory factory = mapper.getFactory();
+ private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ private static JsonFactory factory = mapper.getFactory();
+ public static JsonNode jsonToJsonNode(String json) {
JsonNode node;
try {
JsonParser jp = factory.createJsonParser(json);
@@ -50,7 +56,6 @@ public class JsonUtil {
}
public static String jsonNodeToJson(JsonNode node) {
- ObjectMapper mapper = new ObjectMapper();
try {
return mapper.writeValueAsString(node);
} catch (JsonProcessingException e) {
@@ -59,7 +64,6 @@ public class JsonUtil {
}
public static <T> T jsonToObject(String json, Class<T> clazz) {
- ObjectMapper mapper = new ObjectMapper();
try {
return mapper.readValue(json, clazz);
} catch (IOException e) {
@@ -68,22 +72,18 @@ public class JsonUtil {
}
public static <T> T jsonNodeToObject(JsonNode node, Class<T> clazz) {
- ObjectMapper mapper = new ObjectMapper();
return mapper.convertValue(node, clazz);
}
public static <T> JsonNode objectToJsonNode(T obj) {
- ObjectMapper mapper = new ObjectMapper();
return mapper.valueToTree(obj);
}
public static <T> List<T> jsoNodeToList(JsonNode node, Class<T> clazz) {
- ObjectMapper mapper = new ObjectMapper();
return mapper.convertValue(node, new TypeReference<List<T>>() {});
}
public static <T> String objectToJson(T object) {
- ObjectMapper mapper = new ObjectMapper();
try {
return mapper.writeValueAsString(object);
} catch (IOException e) {
@@ -96,7 +96,6 @@ public class JsonUtil {
}
public static JsonNode getFromFile(String filePath) {
- ObjectMapper mapper = new ObjectMapper();
JsonFactory factory = mapper.getFactory(); // since 2.1 use mapper.getFactory() instead
JsonNode node = null;
@@ -123,4 +122,43 @@ public class JsonUtil {
return stream;
}
+
+ /**
+ * Creates an empty array if missing
+ * @param node object to create the array within
+ * @param field location to create the array
+ * @return the Map representing the extensions property
+ */
+ public static ArrayNode ensureArray(ObjectNode node, String field) {
+ String[] path = Lists.newArrayList(Splitter.on('.').split(field)).toArray(new String[0]);
+ ObjectNode current = node;
+ ArrayNode result = null;
+ for( int i = 0; i < path.length; i++) {
+ current = ensureObject((ObjectNode) node.get(path[i]), path[i]);
+ }
+ if (current.get(field) == null)
+ current.put(field, mapper.createArrayNode());
+ result = (ArrayNode) node.get(field);
+ return result;
+ }
+
+ /**
+ * Creates an empty array if missing
+ * @param node objectnode to create the object within
+ * @param field location to create the object
+ * @return the Map representing the extensions property
+ */
+ public static ObjectNode ensureObject(ObjectNode node, String field) {
+ String[] path = Lists.newArrayList(Splitter.on('.').split(field)).toArray(new String[0]);
+ ObjectNode current = node;
+ ObjectNode result = null;
+ for( int i = 0; i < path.length; i++) {
+ if (node.get(field) == null)
+ node.put(field, mapper.createObjectNode());
+ current = (ObjectNode) node.get(field);
+ }
+ result = ensureObject((ObjectNode) node.get(path[path.length]), Joiner.on('.').join(Arrays.copyOfRange(path, 1, path.length)));
+ return result;
+ }
+
}
[6/7] git commit: added explicit check for ObjectNode document type
Posted by sb...@apache.org.
added explicit check for ObjectNode document type
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/99dc5320
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/99dc5320
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/99dc5320
Branch: refs/heads/master
Commit: 99dc5320651ea6036be803adad2d295f0214d2b6
Parents: 767f945
Author: sblackmon <sb...@w2odigital.com>
Authored: Tue Aug 5 10:35:08 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Tue Aug 5 10:35:08 2014 -0500
----------------------------------------------------------------------
.../streams/elasticsearch/processor/PercolateTagProcessor.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/99dc5320/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
index eddc99b..389f390 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
@@ -122,7 +122,7 @@ public class PercolateTagProcessor implements StreamsProcessor {
e.printStackTrace();
return null;
}
- } else {
+ } else if (entry.getDocument() instanceof ObjectNode) {
node = (ObjectNode) entry.getDocument();
try {
json = mapper.writeValueAsString(node);
@@ -130,6 +130,9 @@ public class PercolateTagProcessor implements StreamsProcessor {
LOGGER.warn("Invalid datum: ", node);
return null;
}
+ } else {
+ LOGGER.warn("Incompatible document type: ", entry.getDocument().getClass());
+ return null;
}
StringBuilder percolateRequestJson = new StringBuilder();
[7/7] git commit: Merge remote-tracking branch 'apache/STREAMS-134'
Posted by sb...@apache.org.
Merge remote-tracking branch 'apache/STREAMS-134'
Conflicts:
streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a538352f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a538352f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a538352f
Branch: refs/heads/master
Commit: a538352fcccd8b3d43152b7e1547d1f69b9ac928
Parents: 8d9986a 99dc532
Author: sblackmon <sb...@w2odigital.com>
Authored: Fri Aug 8 14:56:47 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Fri Aug 8 14:56:47 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchConfigurator.java | 28 +-
.../elasticsearch/PercolateProcessor.java | 169 --------
.../processor/PercolateTagProcessor.java | 387 +++++++++++++++++++
.../ElasticsearchWriterConfiguration.json | 25 +-
streams-pojo/pom.xml | 6 +
.../apache/streams/data/util/ActivityUtil.java | 5 +
.../org/apache/streams/data/util/JsonUtil.java | 60 ++-
7 files changed, 471 insertions(+), 209 deletions(-)
----------------------------------------------------------------------
[5/7] git commit: made processor serializable made config serializable
Posted by sb...@apache.org.
made processor serializable
made config serializable
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/767f9455
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/767f9455
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/767f9455
Branch: refs/heads/master
Commit: 767f945589727be131d3933132e9028f7cc3e008
Parents: 2f1bc42
Author: sblackmon <sb...@w2odigital.com>
Authored: Mon Aug 4 14:07:59 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon Aug 4 14:07:59 2014 -0500
----------------------------------------------------------------------
.../processor/PercolateTagProcessor.java | 51 ++++++++++++++++----
.../ElasticsearchWriterConfiguration.json | 17 ++++---
2 files changed, 50 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/767f9455/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
index 075d2b2..eddc99b 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
@@ -33,9 +33,14 @@ import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.percolate.PercolateRequestBuilder;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
@@ -66,7 +71,7 @@ public class PercolateTagProcessor implements StreamsProcessor {
private final static Logger LOGGER = LoggerFactory.getLogger(PercolateTagProcessor.class);
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ private ObjectMapper mapper;
protected Queue<StreamsDatum> inQueue;
protected Queue<StreamsDatum> outQueue;
@@ -79,7 +84,6 @@ public class PercolateTagProcessor implements StreamsProcessor {
public PercolateTagProcessor(ElasticsearchWriterConfiguration config) {
this.config = config;
- manager = new ElasticsearchClientManager(config);
}
public ElasticsearchClientManager getManager() {
@@ -159,11 +163,7 @@ public class PercolateTagProcessor implements StreamsProcessor {
Activity activity = mapper.convertValue(node, Activity.class);
- Map<String, Object> extensions = ActivityUtil.ensureExtensions(activity);
-
- extensions.put(TAGS_EXTENSION, tagArray);
-
- activity.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
+ appendMatches(tagArray, activity);
entry.setDocument(activity);
@@ -173,11 +173,17 @@ public class PercolateTagProcessor implements StreamsProcessor {
}
+ protected void appendMatches(ArrayNode tagArray, Activity activity) {
+ Map<String, Object> extensions = ActivityUtil.ensureExtensions(activity);
+
+ extensions.put(TAGS_EXTENSION, tagArray);
+
+ activity.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
+ }
+
@Override
public void prepare(Object o) {
- Preconditions.checkNotNull(manager);
- Preconditions.checkNotNull(manager.getClient());
Preconditions.checkNotNull(config);
Preconditions.checkNotNull(config.getTags());
Preconditions.checkArgument(config.getTags().getAdditionalProperties().size() > 0);
@@ -185,8 +191,11 @@ public class PercolateTagProcessor implements StreamsProcessor {
// consider using mapping to figure out what fields are included in _all
//manager.getClient().admin().indices().prepareGetMappings(config.getIndex()).get().getMappings().get(config.getType()).;
- //deleteOldQueries(config.getIndex());
+ mapper = StreamsJacksonMapper.getInstance();
+ manager = new ElasticsearchClientManager(config);
bulkBuilder = manager.getClient().prepareBulk();
+ createIndexIfMissing(config.getIndex());
+ deleteOldQueries(config.getIndex());
for (String tag : config.getTags().getAdditionalProperties().keySet()) {
String query = (String)config.getTags().getAdditionalProperties().get(tag);
PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag, query);
@@ -210,6 +219,28 @@ public class PercolateTagProcessor implements StreamsProcessor {
return this.bulkBuilder.numberOfActions();
}
+ public void createIndexIfMissing(String indexName) {
+ if (!this.manager.getClient()
+ .admin()
+ .indices()
+ .exists(new IndicesExistsRequest(indexName))
+ .actionGet()
+ .isExists()) {
+ // It does not exist... So we are going to need to create the index.
+ // we are going to assume that the 'templates' that we have loaded into
+ // elasticsearch are sufficient to ensure the index is being created properly.
+ CreateIndexResponse response = this.manager.getClient().admin().indices().create(new CreateIndexRequest(indexName)).actionGet();
+
+ if (response.isAcknowledged()) {
+ LOGGER.info("Index {} did not exist. The index was automatically created from the stored ElasticSearch Templates.", indexName);
+ } else {
+ LOGGER.error("Index {} did not exist. While attempting to create the index from stored ElasticSearch Templates we were unable to get an acknowledgement.", indexName);
+ LOGGER.error("Error Message: {}", response.toString());
+ throw new RuntimeException("Unable to create index " + indexName);
+ }
+ }
+ }
+
public void addPercolateRule(PercolateQueryBuilder builder, String index) {
this.bulkBuilder.add(manager.getClient().prepareIndex(index, ".percolator", builder.getId())
.setSource(builder.getSource()));
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/767f9455/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index cd23fe2..26e3fa0 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -24,20 +24,21 @@
"description": "Item Count before flush",
"default": 100
},
- "batchBytes": {
- "type": "integer",
- "description": "Number of bytes before flush",
- "default": 5242880
- },
- "maxTimeBetweenFlushMs": {
- "type": "integer"
- },
+ "batchBytes": {
+ "type": "integer",
+ "description": "Number of bytes before flush",
+ "default": 5242880
+ },
+ "maxTimeBetweenFlushMs": {
+ "type": "integer"
+ },
"script": {
"type": "string",
"description": "Script to execute during index"
},
"tags": {
"type": "object",
+ "javaInterfaces": ["java.io.Serializable"],
"description": "Tags to apply during index"
}
}