You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/06/24 21:44:25 UTC
git commit: adding search request source as JSON field in
configuration
Repository: incubator-streams
Updated Branches:
refs/heads/STREAMS-111 [created] c73dadd74
adding search request source as JSON field in configuration
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c73dadd7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c73dadd7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c73dadd7
Branch: refs/heads/STREAMS-111
Commit: c73dadd7418bba8e783e9589327029fec4b23970
Parents: a33c215
Author: sblackmon <sb...@w2odigital.com>
Authored: Tue Jun 24 14:29:19 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Tue Jun 24 14:29:19 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchConfigurator.java | 14 +++++++++
.../elasticsearch/ElasticsearchQuery.java | 31 +++++++++++++++++---
.../ElasticsearchReaderConfiguration.json | 5 ++++
3 files changed, 46 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c73dadd7/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
index af4e360..d27a34d 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
@@ -20,10 +20,13 @@ package org.apache.streams.elasticsearch;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.List;
+import java.util.Map;
/**
* Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration
@@ -59,6 +62,17 @@ public class ElasticsearchConfigurator {
elasticsearchReaderConfiguration.setIndexes(indexes);
elasticsearchReaderConfiguration.setTypes(types);
+ if( elasticsearch.hasPath("_search") ) {
+ LOGGER.info("_search supplied by config");
+ Config searchConfig = elasticsearch.getConfig("_search");
+ try {
+ elasticsearchReaderConfiguration.setSearch(mapper.readValue(searchConfig.root().render(ConfigRenderOptions.concise()), Map.class));
+ } catch (IOException e) {
+ e.printStackTrace();
+ LOGGER.warn("Could not parse _search supplied by config");
+ }
+ }
+
return elasticsearchReaderConfiguration;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c73dadd7/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
index 05b1686..393aa19 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -18,17 +18,21 @@
package org.apache.streams.elasticsearch;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Lists;
import com.google.common.base.Objects;
import com.typesafe.config.Config;
import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.joda.time.DateTime;
import org.slf4j.Logger;
@@ -47,7 +51,7 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
private static final String DEFAULT_SCROLL_TIMEOUT = "5m";
private ElasticsearchClientManager elasticsearchClientManager;
- private ElasticsearchConfiguration config;
+ private ElasticsearchReaderConfiguration config;
private List<String> indexes = Lists.newArrayList();
private List<String> types = Lists.newArrayList();
private String[] withfields;
@@ -67,9 +71,11 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
private long totalHits = 0;
private long totalRead = 0;
+ private StreamsJacksonMapper mapper = StreamsJacksonMapper.getInstance();
+
public ElasticsearchQuery() {
Config config = StreamsConfigurator.config.getConfig("elasticsearch");
- this.config = ElasticsearchConfigurator.detectConfiguration(config);
+ this.config = ElasticsearchConfigurator.detectReaderConfiguration(config);
}
public ElasticsearchQuery(ElasticsearchReaderConfiguration config) {
@@ -123,14 +129,31 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
// If we haven't already set up the search, then set up the search.
if (search == null) {
+
search = elasticsearchClientManager.getClient()
.prepareSearch(indexes.toArray(new String[0]))
.setSearchType(SearchType.SCAN)
.setSize(Objects.firstNonNull(batchSize, DEFAULT_BATCH_SIZE).intValue())
.setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT));
+ String searchJson;
+ if( config.getSearch() != null ) {
+ LOGGER.info("Have config in Reader: %s", config.getSearch().toString());
+
+ try {
+ searchJson = mapper.writeValueAsString(config.getSearch());
+ LOGGER.info("Setting source: %s", searchJson);
+ search = search.setSource(searchJson);
+
+ } catch (JsonProcessingException e) {
+ LOGGER.warn("Could not apply _search supplied by config");
+ e.printStackTrace();
+ }
+ }
+
+
if (this.queryBuilder != null)
- search.setQuery(this.queryBuilder);
+ search = search.setQuery(this.queryBuilder);
// If the types are null, then don't specify a type
if (this.types != null && this.types.size() > 0)
@@ -150,7 +173,7 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
if (clauses > 0) {
// search.setPostFilter(allFilters);
- search.setPostFilter(allFilters);
+ search = search.setPostFilter(allFilters);
}
// TODO: Replace when all clusters are upgraded past 0.90.4 so we can implement a RANDOM scroll.
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c73dadd7/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
index 1f1c720..500430a 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
@@ -19,6 +19,11 @@
"type": "string"
},
"description": "Types to read from"
+ },
+ "_search": {
+ "type": "object",
+ "javaType" : "java.util.Map",
+ "description": "Search definition"
}
}
}
\ No newline at end of file