You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/08/08 21:57:20 UTC

[5/7] git commit: made processor serializable made config serializable

made processor serializable
made config serializable


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/767f9455
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/767f9455
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/767f9455

Branch: refs/heads/master
Commit: 767f945589727be131d3933132e9028f7cc3e008
Parents: 2f1bc42
Author: sblackmon <sb...@w2odigital.com>
Authored: Mon Aug 4 14:07:59 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon Aug 4 14:07:59 2014 -0500

----------------------------------------------------------------------
 .../processor/PercolateTagProcessor.java        | 51 ++++++++++++++++----
 .../ElasticsearchWriterConfiguration.json       | 17 ++++---
 2 files changed, 50 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/767f9455/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
index 075d2b2..eddc99b 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
@@ -33,9 +33,14 @@ import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
 import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.percolate.PercolateRequestBuilder;
 import org.elasticsearch.action.percolate.PercolateResponse;
 import org.elasticsearch.action.search.SearchRequestBuilder;
@@ -66,7 +71,7 @@ public class PercolateTagProcessor implements StreamsProcessor {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(PercolateTagProcessor.class);
 
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+    private ObjectMapper mapper;
 
     protected Queue<StreamsDatum> inQueue;
     protected Queue<StreamsDatum> outQueue;
@@ -79,7 +84,6 @@ public class PercolateTagProcessor implements StreamsProcessor {
 
     public PercolateTagProcessor(ElasticsearchWriterConfiguration config) {
         this.config = config;
-        manager = new ElasticsearchClientManager(config);
     }
 
     public ElasticsearchClientManager getManager() {
@@ -159,11 +163,7 @@ public class PercolateTagProcessor implements StreamsProcessor {
 
         Activity activity = mapper.convertValue(node, Activity.class);
 
-        Map<String, Object> extensions = ActivityUtil.ensureExtensions(activity);
-
-        extensions.put(TAGS_EXTENSION, tagArray);
-
-        activity.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
+        appendMatches(tagArray, activity);
 
         entry.setDocument(activity);
 
@@ -173,11 +173,17 @@ public class PercolateTagProcessor implements StreamsProcessor {
 
     }
 
+    protected void appendMatches(ArrayNode tagArray, Activity activity) {
+        Map<String, Object> extensions = ActivityUtil.ensureExtensions(activity);
+
+        extensions.put(TAGS_EXTENSION, tagArray);
+
+        activity.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
+    }
+
     @Override
     public void prepare(Object o) {
 
-        Preconditions.checkNotNull(manager);
-        Preconditions.checkNotNull(manager.getClient());
         Preconditions.checkNotNull(config);
         Preconditions.checkNotNull(config.getTags());
         Preconditions.checkArgument(config.getTags().getAdditionalProperties().size() > 0);
@@ -185,8 +191,11 @@ public class PercolateTagProcessor implements StreamsProcessor {
         // consider using mapping to figure out what fields are included in _all
         //manager.getClient().admin().indices().prepareGetMappings(config.getIndex()).get().getMappings().get(config.getType()).;
 
-        //deleteOldQueries(config.getIndex());
+        mapper = StreamsJacksonMapper.getInstance();
+        manager = new ElasticsearchClientManager(config);
         bulkBuilder = manager.getClient().prepareBulk();
+        createIndexIfMissing(config.getIndex());
+        deleteOldQueries(config.getIndex());
         for (String tag : config.getTags().getAdditionalProperties().keySet()) {
             String query = (String)config.getTags().getAdditionalProperties().get(tag);
             PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag, query);
@@ -210,6 +219,28 @@ public class PercolateTagProcessor implements StreamsProcessor {
         return this.bulkBuilder.numberOfActions();
     }
 
+    public void createIndexIfMissing(String indexName) {
+        if (!this.manager.getClient()
+                .admin()
+                .indices()
+                .exists(new IndicesExistsRequest(indexName))
+                .actionGet()
+                .isExists()) {
+            // It does not exist... So we are going to need to create the index.
+            // we are going to assume that the 'templates' that we have loaded into
+            // elasticsearch are sufficient to ensure the index is being created properly.
+            CreateIndexResponse response = this.manager.getClient().admin().indices().create(new CreateIndexRequest(indexName)).actionGet();
+
+            if (response.isAcknowledged()) {
+                LOGGER.info("Index {} did not exist. The index was automatically created from the stored ElasticSearch Templates.", indexName);
+            } else {
+                LOGGER.error("Index {} did not exist. While attempting to create the index from stored ElasticSearch Templates we were unable to get an acknowledgement.", indexName);
+                LOGGER.error("Error Message: {}", response.toString());
+                throw new RuntimeException("Unable to create index " + indexName);
+            }
+        }
+    }
+
     public void addPercolateRule(PercolateQueryBuilder builder, String index) {
         this.bulkBuilder.add(manager.getClient().prepareIndex(index, ".percolator", builder.getId())
                 .setSource(builder.getSource()));

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/767f9455/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index cd23fe2..26e3fa0 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -24,20 +24,21 @@
             "description": "Item Count before flush",
             "default": 100
         },
-		"batchBytes": {
-			"type": "integer",
-			"description": "Number of bytes before flush",
-			"default": 5242880
-		},
-		"maxTimeBetweenFlushMs": {
-			"type": "integer"
-		},
+        "batchBytes": {
+            "type": "integer",
+            "description": "Number of bytes before flush",
+            "default": 5242880
+        },
+        "maxTimeBetweenFlushMs": {
+            "type": "integer"
+        },
         "script": {
             "type": "string",
             "description": "Script to execute during index"
         },
         "tags": {
             "type": "object",
+            "javaInterfaces": ["java.io.Serializable"],
             "description": "Tags to apply during index"
         }
     }