You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/08/08 21:57:20 UTC
[5/7] git commit: made processor serializable made config serializable
made processor serializable
made config serializable
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/767f9455
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/767f9455
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/767f9455
Branch: refs/heads/master
Commit: 767f945589727be131d3933132e9028f7cc3e008
Parents: 2f1bc42
Author: sblackmon <sb...@w2odigital.com>
Authored: Mon Aug 4 14:07:59 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon Aug 4 14:07:59 2014 -0500
----------------------------------------------------------------------
.../processor/PercolateTagProcessor.java | 51 ++++++++++++++++----
.../ElasticsearchWriterConfiguration.json | 17 ++++---
2 files changed, 50 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/767f9455/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
index 075d2b2..eddc99b 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
@@ -33,9 +33,14 @@ import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.percolate.PercolateRequestBuilder;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
@@ -66,7 +71,7 @@ public class PercolateTagProcessor implements StreamsProcessor {
private final static Logger LOGGER = LoggerFactory.getLogger(PercolateTagProcessor.class);
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ private ObjectMapper mapper;
protected Queue<StreamsDatum> inQueue;
protected Queue<StreamsDatum> outQueue;
@@ -79,7 +84,6 @@ public class PercolateTagProcessor implements StreamsProcessor {
public PercolateTagProcessor(ElasticsearchWriterConfiguration config) {
this.config = config;
- manager = new ElasticsearchClientManager(config);
}
public ElasticsearchClientManager getManager() {
@@ -159,11 +163,7 @@ public class PercolateTagProcessor implements StreamsProcessor {
Activity activity = mapper.convertValue(node, Activity.class);
- Map<String, Object> extensions = ActivityUtil.ensureExtensions(activity);
-
- extensions.put(TAGS_EXTENSION, tagArray);
-
- activity.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
+ appendMatches(tagArray, activity);
entry.setDocument(activity);
@@ -173,11 +173,17 @@ public class PercolateTagProcessor implements StreamsProcessor {
}
+ protected void appendMatches(ArrayNode tagArray, Activity activity) {
+ Map<String, Object> extensions = ActivityUtil.ensureExtensions(activity);
+
+ extensions.put(TAGS_EXTENSION, tagArray);
+
+ activity.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
+ }
+
@Override
public void prepare(Object o) {
- Preconditions.checkNotNull(manager);
- Preconditions.checkNotNull(manager.getClient());
Preconditions.checkNotNull(config);
Preconditions.checkNotNull(config.getTags());
Preconditions.checkArgument(config.getTags().getAdditionalProperties().size() > 0);
@@ -185,8 +191,11 @@ public class PercolateTagProcessor implements StreamsProcessor {
// consider using mapping to figure out what fields are included in _all
//manager.getClient().admin().indices().prepareGetMappings(config.getIndex()).get().getMappings().get(config.getType()).;
- //deleteOldQueries(config.getIndex());
+ mapper = StreamsJacksonMapper.getInstance();
+ manager = new ElasticsearchClientManager(config);
bulkBuilder = manager.getClient().prepareBulk();
+ createIndexIfMissing(config.getIndex());
+ deleteOldQueries(config.getIndex());
for (String tag : config.getTags().getAdditionalProperties().keySet()) {
String query = (String)config.getTags().getAdditionalProperties().get(tag);
PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag, query);
@@ -210,6 +219,28 @@ public class PercolateTagProcessor implements StreamsProcessor {
return this.bulkBuilder.numberOfActions();
}
+ public void createIndexIfMissing(String indexName) {
+ if (!this.manager.getClient()
+ .admin()
+ .indices()
+ .exists(new IndicesExistsRequest(indexName))
+ .actionGet()
+ .isExists()) {
+ // It does not exist... So we are going to need to create the index.
+ // we are going to assume that the 'templates' that we have loaded into
+ // elasticsearch are sufficient to ensure the index is being created properly.
+ CreateIndexResponse response = this.manager.getClient().admin().indices().create(new CreateIndexRequest(indexName)).actionGet();
+
+ if (response.isAcknowledged()) {
+ LOGGER.info("Index {} did not exist. The index was automatically created from the stored ElasticSearch Templates.", indexName);
+ } else {
+ LOGGER.error("Index {} did not exist. While attempting to create the index from stored ElasticSearch Templates we were unable to get an acknowledgement.", indexName);
+ LOGGER.error("Error Message: {}", response.toString());
+ throw new RuntimeException("Unable to create index " + indexName);
+ }
+ }
+ }
+
public void addPercolateRule(PercolateQueryBuilder builder, String index) {
this.bulkBuilder.add(manager.getClient().prepareIndex(index, ".percolator", builder.getId())
.setSource(builder.getSource()));
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/767f9455/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index cd23fe2..26e3fa0 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -24,20 +24,21 @@
"description": "Item Count before flush",
"default": 100
},
- "batchBytes": {
- "type": "integer",
- "description": "Number of bytes before flush",
- "default": 5242880
- },
- "maxTimeBetweenFlushMs": {
- "type": "integer"
- },
+ "batchBytes": {
+ "type": "integer",
+ "description": "Number of bytes before flush",
+ "default": 5242880
+ },
+ "maxTimeBetweenFlushMs": {
+ "type": "integer"
+ },
"script": {
"type": "string",
"description": "Script to execute during index"
},
"tags": {
"type": "object",
+ "javaInterfaces": ["java.io.Serializable"],
"description": "Tags to apply during index"
}
}