You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/06/24 21:44:25 UTC

git commit: adding search request source as JSON field in configuration

Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-111 [created] c73dadd74


adding search request source as JSON field in configuration


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c73dadd7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c73dadd7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c73dadd7

Branch: refs/heads/STREAMS-111
Commit: c73dadd7418bba8e783e9589327029fec4b23970
Parents: a33c215
Author: sblackmon <sb...@w2odigital.com>
Authored: Tue Jun 24 14:29:19 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Tue Jun 24 14:29:19 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchConfigurator.java              | 14 +++++++++
 .../elasticsearch/ElasticsearchQuery.java       | 31 +++++++++++++++++---
 .../ElasticsearchReaderConfiguration.json       |  5 ++++
 3 files changed, 46 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c73dadd7/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
index af4e360..d27a34d 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
@@ -20,10 +20,13 @@ package org.apache.streams.elasticsearch;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration
@@ -59,6 +62,17 @@ public class ElasticsearchConfigurator {
         elasticsearchReaderConfiguration.setIndexes(indexes);
         elasticsearchReaderConfiguration.setTypes(types);
 
+        if( elasticsearch.hasPath("_search") ) {
+            LOGGER.info("_search supplied by config");
+            Config searchConfig = elasticsearch.getConfig("_search");
+            try {
+                elasticsearchReaderConfiguration.setSearch(mapper.readValue(searchConfig.root().render(ConfigRenderOptions.concise()), Map.class));
+            } catch (IOException e) {
+                e.printStackTrace();
+                LOGGER.warn("Could not parse _search supplied by config");
+            }
+        }
+
         return elasticsearchReaderConfiguration;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c73dadd7/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
index 05b1686..393aa19 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -18,17 +18,21 @@
 
 package org.apache.streams.elasticsearch;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.Lists;
 import com.google.common.base.Objects;
 import com.typesafe.config.Config;
 import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchType;
 import org.elasticsearch.index.query.FilterBuilder;
 import org.elasticsearch.index.query.FilterBuilders;
 import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.sort.SortBuilders;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -47,7 +51,7 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
     private static final String DEFAULT_SCROLL_TIMEOUT = "5m";
 
     private ElasticsearchClientManager elasticsearchClientManager;
-    private ElasticsearchConfiguration config;
+    private ElasticsearchReaderConfiguration config;
     private List<String> indexes = Lists.newArrayList();
     private List<String> types = Lists.newArrayList();
     private String[] withfields;
@@ -67,9 +71,11 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
     private long totalHits = 0;
     private long totalRead = 0;
 
+    private StreamsJacksonMapper mapper = StreamsJacksonMapper.getInstance();
+
     public ElasticsearchQuery() {
         Config config = StreamsConfigurator.config.getConfig("elasticsearch");
-        this.config = ElasticsearchConfigurator.detectConfiguration(config);
+        this.config = ElasticsearchConfigurator.detectReaderConfiguration(config);
     }
 
     public ElasticsearchQuery(ElasticsearchReaderConfiguration config) {
@@ -123,14 +129,31 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
 
         // If we haven't already set up the search, then set up the search.
         if (search == null) {
+
             search = elasticsearchClientManager.getClient()
                     .prepareSearch(indexes.toArray(new String[0]))
                     .setSearchType(SearchType.SCAN)
                     .setSize(Objects.firstNonNull(batchSize, DEFAULT_BATCH_SIZE).intValue())
                     .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT));
 
+            String searchJson;
+            if( config.getSearch() != null ) {
+                LOGGER.info("Have config in Reader: %s", config.getSearch().toString());
+
+                try {
+                    searchJson = mapper.writeValueAsString(config.getSearch());
+                    LOGGER.info("Setting source: %s", searchJson);
+                    search = search.setSource(searchJson);
+
+                } catch (JsonProcessingException e) {
+                    LOGGER.warn("Could not apply _search supplied by config");
+                    e.printStackTrace();
+                }
+            }
+
+
             if (this.queryBuilder != null)
-                search.setQuery(this.queryBuilder);
+                search = search.setQuery(this.queryBuilder);
 
             // If the types are null, then don't specify a type
             if (this.types != null && this.types.size() > 0)
@@ -150,7 +173,7 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
 
             if (clauses > 0) {
                 //    search.setPostFilter(allFilters);
-                search.setPostFilter(allFilters);
+                search = search.setPostFilter(allFilters);
             }
 
             // TODO: Replace when all clusters are upgraded past 0.90.4 so we can implement a RANDOM scroll.

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c73dadd7/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
index 1f1c720..500430a 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
@@ -19,6 +19,11 @@
                 "type": "string"
             },
             "description": "Types to read from"
+        },
+        "_search": {
+            "type": "object",
+            "javaType" : "java.util.Map",
+            "description": "Search definition"
         }
     }
 }
\ No newline at end of file