You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/07/07 17:09:57 UTC

[1/5] git commit: adding search request source as JSON field in configuration

Repository: incubator-streams
Updated Branches:
  refs/heads/master 0380a28c4 -> c50ce9175


adding search request source as JSON field in configuration


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c73dadd7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c73dadd7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c73dadd7

Branch: refs/heads/master
Commit: c73dadd7418bba8e783e9589327029fec4b23970
Parents: a33c215
Author: sblackmon <sb...@w2odigital.com>
Authored: Tue Jun 24 14:29:19 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Tue Jun 24 14:29:19 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchConfigurator.java              | 14 +++++++++
 .../elasticsearch/ElasticsearchQuery.java       | 31 +++++++++++++++++---
 .../ElasticsearchReaderConfiguration.json       |  5 ++++
 3 files changed, 46 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c73dadd7/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
index af4e360..d27a34d 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
@@ -20,10 +20,13 @@ package org.apache.streams.elasticsearch;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration
@@ -59,6 +62,17 @@ public class ElasticsearchConfigurator {
         elasticsearchReaderConfiguration.setIndexes(indexes);
         elasticsearchReaderConfiguration.setTypes(types);
 
+        if( elasticsearch.hasPath("_search") ) {
+            LOGGER.info("_search supplied by config");
+            Config searchConfig = elasticsearch.getConfig("_search");
+            try {
+                elasticsearchReaderConfiguration.setSearch(mapper.readValue(searchConfig.root().render(ConfigRenderOptions.concise()), Map.class));
+            } catch (IOException e) {
+                e.printStackTrace();
+                LOGGER.warn("Could not parse _search supplied by config");
+            }
+        }
+
         return elasticsearchReaderConfiguration;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c73dadd7/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
index 05b1686..393aa19 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -18,17 +18,21 @@
 
 package org.apache.streams.elasticsearch;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.Lists;
 import com.google.common.base.Objects;
 import com.typesafe.config.Config;
 import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchType;
 import org.elasticsearch.index.query.FilterBuilder;
 import org.elasticsearch.index.query.FilterBuilders;
 import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.sort.SortBuilders;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -47,7 +51,7 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
     private static final String DEFAULT_SCROLL_TIMEOUT = "5m";
 
     private ElasticsearchClientManager elasticsearchClientManager;
-    private ElasticsearchConfiguration config;
+    private ElasticsearchReaderConfiguration config;
     private List<String> indexes = Lists.newArrayList();
     private List<String> types = Lists.newArrayList();
     private String[] withfields;
@@ -67,9 +71,11 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
     private long totalHits = 0;
     private long totalRead = 0;
 
+    private StreamsJacksonMapper mapper = StreamsJacksonMapper.getInstance();
+
     public ElasticsearchQuery() {
         Config config = StreamsConfigurator.config.getConfig("elasticsearch");
-        this.config = ElasticsearchConfigurator.detectConfiguration(config);
+        this.config = ElasticsearchConfigurator.detectReaderConfiguration(config);
     }
 
     public ElasticsearchQuery(ElasticsearchReaderConfiguration config) {
@@ -123,14 +129,31 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
 
         // If we haven't already set up the search, then set up the search.
         if (search == null) {
+
             search = elasticsearchClientManager.getClient()
                     .prepareSearch(indexes.toArray(new String[0]))
                     .setSearchType(SearchType.SCAN)
                     .setSize(Objects.firstNonNull(batchSize, DEFAULT_BATCH_SIZE).intValue())
                     .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT));
 
+            String searchJson;
+            if( config.getSearch() != null ) {
+                LOGGER.info("Have config in Reader: %s", config.getSearch().toString());
+
+                try {
+                    searchJson = mapper.writeValueAsString(config.getSearch());
+                    LOGGER.info("Setting source: %s", searchJson);
+                    search = search.setSource(searchJson);
+
+                } catch (JsonProcessingException e) {
+                    LOGGER.warn("Could not apply _search supplied by config");
+                    e.printStackTrace();
+                }
+            }
+
+
             if (this.queryBuilder != null)
-                search.setQuery(this.queryBuilder);
+                search = search.setQuery(this.queryBuilder);
 
             // If the types are null, then don't specify a type
             if (this.types != null && this.types.size() > 0)
@@ -150,7 +173,7 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
 
             if (clauses > 0) {
                 //    search.setPostFilter(allFilters);
-                search.setPostFilter(allFilters);
+                search = search.setPostFilter(allFilters);
             }
 
             // TODO: Replace when all clusters are upgraded past 0.90.4 so we can implement a RANDOM scroll.

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c73dadd7/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
index 1f1c720..500430a 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
@@ -19,6 +19,11 @@
                 "type": "string"
             },
             "description": "Types to read from"
+        },
+        "_search": {
+            "type": "object",
+            "javaType" : "java.util.Map",
+            "description": "Search definition"
         }
     }
 }
\ No newline at end of file


[2/5] git commit: better debug logging

Posted by sb...@apache.org.
better debug logging


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9582c0cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9582c0cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9582c0cb

Branch: refs/heads/master
Commit: 9582c0cb703f075b218016b0dac2e85708a71d14
Parents: c73dadd
Author: sblackmon <sb...@w2odigital.com>
Authored: Wed Jun 25 13:45:13 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Wed Jun 25 13:45:13 2014 -0500

----------------------------------------------------------------------
 .../org/apache/streams/elasticsearch/ElasticsearchQuery.java  | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9582c0cb/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
index 393aa19..475d93c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -138,17 +138,20 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
 
             String searchJson;
             if( config.getSearch() != null ) {
-                LOGGER.info("Have config in Reader: %s", config.getSearch().toString());
+                LOGGER.info("Have config in Reader: " + config.getSearch().toString());
 
                 try {
                     searchJson = mapper.writeValueAsString(config.getSearch());
-                    LOGGER.info("Setting source: %s", searchJson);
+                    LOGGER.info("Setting source: " + searchJson);
                     search = search.setSource(searchJson);
 
                 } catch (JsonProcessingException e) {
                     LOGGER.warn("Could not apply _search supplied by config");
                     e.printStackTrace();
                 }
+
+                LOGGER.info("Search Source is now " + search.toString());
+
             }
 
 


[5/5] git commit: Merge branch 'STREAMS-116'

Posted by sb...@apache.org.
Merge branch 'STREAMS-116'


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c50ce917
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c50ce917
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c50ce917

Branch: refs/heads/master
Commit: c50ce9175456ea92597918a44c4e9f33e77fcb4e
Parents: 0380a28 738e7ca
Author: sblackmon <sb...@w2odigital.com>
Authored: Mon Jul 7 10:09:25 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon Jul 7 10:09:25 2014 -0500

----------------------------------------------------------------------
 .../streams-persist-elasticsearch/README.md     | 38 ++++++++++++++++++++
 .../ElasticsearchConfigurator.java              | 14 ++++++++
 .../elasticsearch/ElasticsearchQuery.java       | 34 +++++++++++++++---
 .../ElasticsearchReaderConfiguration.json       |  5 +++
 .../src/main/resources/reference.json           |  9 +++++
 5 files changed, 95 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c50ce917/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------


[3/5] git commit: resolves STREAMS-116 adding README

Posted by sb...@apache.org.
resolves STREAMS-116
adding README


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b1f94a6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b1f94a6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b1f94a6b

Branch: refs/heads/master
Commit: b1f94a6b7ad9d88ffaf393953b8b246eb41364e1
Parents: 9582c0c
Author: sblackmon <sb...@w2odigital.com>
Authored: Wed Jun 25 17:36:48 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Wed Jun 25 17:36:48 2014 -0500

----------------------------------------------------------------------
 .../streams-persist-elasticsearch/README.md     | 47 ++++++++++++++++++++
 .../elasticsearch/ElasticsearchQuery.java       |  8 ++--
 .../src/main/resources/reference.json           |  9 ++++
 3 files changed, 60 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b1f94a6b/streams-contrib/streams-persist-elasticsearch/README.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/README.md b/streams-contrib/streams-persist-elasticsearch/README.md
new file mode 100644
index 0000000..fbdf86c
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/README.md
@@ -0,0 +1,47 @@
+streams-persist-elasticsearch
+=====================
+
+Read and write to Elasticsearch
+
+Example reader configuration:
+
+    "elasticsearch": {
+        "hosts": [
+            "localhost"
+        ],
+        "port": 9300,
+        "clusterName": "elasticsearch",
+        "indexes": [
+            "sourceindex"
+        ],
+        "types": [
+            "sourcetype"
+        ],
+        "_search": {
+            "query" : {
+                "match_all" : { }
+            }
+        }
+    }
+
+Example writer configuration:
+
+    "elasticsearch": {
+        "hosts": [
+            "localhost"
+        ],
+        "port": 9300,
+        "clusterName": "elasticsearch",
+        "index": "destinationindex",
+        "type": "destinationtype
+    }
+
+For more examples, see:
+
+- [elasticsearch-backup](http://github.com/w2ogroup/streams-examples/tree/master/elasticsearch-backup)
+- [elasticsearch-reindex](http://github.com/w2ogroup/streams-examples/tree/master/elasticsearch-reindex)
+- [elasticsearch-restore](http://github.com/w2ogroup/streams-examples/tree/master/elasticsearch-restore)
+- [mongo-elasticsearch-index](https://github.com/w2ogroup/streams-examples/tree/master/mongo-elasticsearch-index)
+- [twitter-gardenhose-elasticsearch](https://github.com/w2ogroup/streams-examples/tree/master/twitter-gardenhose-elasticsearch)
+- [twitter-history-elasticsearch](https://github.com/w2ogroup/streams-examples/tree/master/twitter-history-elasticsearch)
+- [twitter-userstream-elasticsearch](https://github.com/w2ogroup/streams-examples/tree/master/twitter-userstream-elasticsearch)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b1f94a6b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
index 475d93c..55064d3 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -138,19 +138,19 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
 
             String searchJson;
             if( config.getSearch() != null ) {
-                LOGGER.info("Have config in Reader: " + config.getSearch().toString());
+                LOGGER.debug("Have config in Reader: " + config.getSearch().toString());
 
                 try {
                     searchJson = mapper.writeValueAsString(config.getSearch());
-                    LOGGER.info("Setting source: " + searchJson);
-                    search = search.setSource(searchJson);
+                    LOGGER.debug("Setting source: " + searchJson);
+                    search = search.setExtraSource(searchJson);
 
                 } catch (JsonProcessingException e) {
                     LOGGER.warn("Could not apply _search supplied by config");
                     e.printStackTrace();
                 }
 
-                LOGGER.info("Search Source is now " + search.toString());
+                LOGGER.debug("Search Source is now " + search.toString());
 
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b1f94a6b/streams-contrib/streams-persist-elasticsearch/src/main/resources/reference.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/resources/reference.json b/streams-contrib/streams-persist-elasticsearch/src/main/resources/reference.json
new file mode 100644
index 0000000..408399c
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/resources/reference.json
@@ -0,0 +1,9 @@
+{
+    "elasticsearch": {
+        "hosts": [
+            "localhost"
+        ],
+        "port": 9300,
+        "clusterName": "elasticsearch"
+    }
+}
\ No newline at end of file


[4/5] git commit: PR 44 feedback

Posted by sb...@apache.org.
PR 44 feedback


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/738e7cac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/738e7cac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/738e7cac

Branch: refs/heads/master
Commit: 738e7cac5c307fcf6dfcd451a08826b6ba984a62
Parents: b1f94a6
Author: sblackmon <sb...@w2odigital.com>
Authored: Mon Jul 7 10:02:30 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon Jul 7 10:02:30 2014 -0500

----------------------------------------------------------------------
 streams-contrib/streams-persist-elasticsearch/README.md     | 9 ---------
 .../apache/streams/elasticsearch/ElasticsearchQuery.java    | 4 +---
 2 files changed, 1 insertion(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/738e7cac/streams-contrib/streams-persist-elasticsearch/README.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/README.md b/streams-contrib/streams-persist-elasticsearch/README.md
index fbdf86c..c4182e4 100644
--- a/streams-contrib/streams-persist-elasticsearch/README.md
+++ b/streams-contrib/streams-persist-elasticsearch/README.md
@@ -36,12 +36,3 @@ Example writer configuration:
         "type": "destinationtype
     }
 
-For more examples, see:
-
-- [elasticsearch-backup](http://github.com/w2ogroup/streams-examples/tree/master/elasticsearch-backup)
-- [elasticsearch-reindex](http://github.com/w2ogroup/streams-examples/tree/master/elasticsearch-reindex)
-- [elasticsearch-restore](http://github.com/w2ogroup/streams-examples/tree/master/elasticsearch-restore)
-- [mongo-elasticsearch-index](https://github.com/w2ogroup/streams-examples/tree/master/mongo-elasticsearch-index)
-- [twitter-gardenhose-elasticsearch](https://github.com/w2ogroup/streams-examples/tree/master/twitter-gardenhose-elasticsearch)
-- [twitter-history-elasticsearch](https://github.com/w2ogroup/streams-examples/tree/master/twitter-history-elasticsearch)
-- [twitter-userstream-elasticsearch](https://github.com/w2ogroup/streams-examples/tree/master/twitter-userstream-elasticsearch)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/738e7cac/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
index 55064d3..fbc62c4 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -146,8 +146,7 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
                     search = search.setExtraSource(searchJson);
 
                 } catch (JsonProcessingException e) {
-                    LOGGER.warn("Could not apply _search supplied by config");
-                    e.printStackTrace();
+                    LOGGER.warn("Could not apply _search supplied by config", e.getMessage());
                 }
 
                 LOGGER.debug("Search Source is now " + search.toString());
@@ -239,7 +238,6 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
                 totalRead += 1;
             }
         } catch (Exception e) {
-            e.printStackTrace();
             LOGGER.error("Unexpected scrolling error: {}", e.getMessage());
             scrollPositionInScroll = -1;
             next = null;