You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/04/22 19:13:04 UTC
[2/3] git commit: Refactored Elasticsearh components into their own
class for better cohesion
Refactored Elasticsearh components into their own class for better cohesion
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b5fd7e70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b5fd7e70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b5fd7e70
Branch: refs/heads/STREAMS-58
Commit: b5fd7e70aa81515c4278cbaec788fbf3f81b8d58
Parents: af1efe8
Author: mfranklin <mf...@apache.org>
Authored: Tue Apr 22 11:34:39 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Tue Apr 22 11:34:39 2014 -0400
----------------------------------------------------------------------
.../ElasticsearchPersistReader.java | 308 ++++---------------
.../ElasticsearchPersistReaderTask.java | 63 ----
.../elasticsearch/ElasticsearchQuery.java | 282 +++++++++++++++++
3 files changed, 339 insertions(+), 314 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b5fd7e70/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index 72a9954..fd2a155 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -1,25 +1,17 @@
package org.apache.streams.elasticsearch;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Queues;
-import com.typesafe.config.Config;
-import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.*;
import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.search.SearchType;
-import org.elasticsearch.index.query.FilterBuilder;
-import org.elasticsearch.index.query.FilterBuilders;
-import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.sort.SortBuilders;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.io.Serializable;
import java.math.BigInteger;
import java.util.*;
import java.util.concurrent.*;
@@ -32,95 +24,23 @@ import java.util.concurrent.*;
* ************************************************************************************************************
*/
-public class ElasticsearchPersistReader implements StreamsPersistReader, Iterable<SearchHit>, Iterator<SearchHit> {
+public class ElasticsearchPersistReader implements StreamsPersistReader, Serializable {
public static final String STREAMS_ID = "ElasticsearchPersistReader";
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistReader.class);
- private static final int SCROLL_POSITION_NOT_INITIALIZED = -3;
- private static final Integer DEFAULT_BATCH_SIZE = 500;
- private static final String DEFAULT_SCROLL_TIMEOUT = "5m";
protected volatile Queue<StreamsDatum> persistQueue;
- private ElasticsearchClientManager elasticsearchClientManager;
- private List<String> indexes = Lists.newArrayList();
- private List<String> types = Lists.newArrayList();
- private String[] withfields;
- private String[] withoutfields;
- private DateTime startDate;
- private DateTime endDate;
- private int limit = 1000 * 1000 * 1000; // we are going to set the default limit very high to 1bil
- private boolean random = false;
+ private ElasticsearchQuery elasticsearchQuery;
+ private ElasticsearchReaderConfiguration config;
private int threadPoolSize = 10;
- private int batchSize = 100;
- private String scrollTimeout = null;
-
- private ObjectMapper mapper;
-
- private ElasticsearchConfiguration config;
-
private ExecutorService executor;
- private QueryBuilder queryBuilder;
- private FilterBuilder filterBuilder;
-
- // These are private to help us manage the scroll
- private SearchRequestBuilder search;
- private SearchResponse scrollResp;
- private int scrollPositionInScroll = SCROLL_POSITION_NOT_INITIALIZED;
- private SearchHit next = null;
- private long totalHits = 0;
- private long totalRead = 0;
-
public ElasticsearchPersistReader() {
- Config config = StreamsConfigurator.config.getConfig("elasticsearch");
- this.config = ElasticsearchConfigurator.detectConfiguration(config);
- }
-
- public ElasticsearchPersistReader(ElasticsearchReaderConfiguration elasticsearchConfiguration) {
- this.elasticsearchClientManager = new ElasticsearchClientManager(elasticsearchConfiguration);
- indexes.addAll(elasticsearchConfiguration.getIndexes());
- types.addAll(elasticsearchConfiguration.getTypes());
- }
-
- public long getHitCount() {
- return this.search == null ? 0 : this.totalHits;
- }
-
- public long getReadCount() {
- return this.totalRead;
- }
-
- public double getReadPercent() {
- return (double) this.getReadCount() / (double) this.getHitCount();
- }
-
- public long getRemainingCount() {
- return this.totalRead - this.totalHits;
- }
-
- public void setBatchSize(int batchSize) {
- this.batchSize = batchSize;
}
- public void setScrollTimeout(String scrollTimeout) {
- this.scrollTimeout = scrollTimeout;
- }
-
- public void setQueryBuilder(QueryBuilder queryBuilder) {
- this.queryBuilder = queryBuilder;
- }
-
- public void setFilterBuilder(FilterBuilder filterBuilder) {
- this.filterBuilder = filterBuilder;
- }
-
- public void setWithfields(String[] withfields) {
- this.withfields = withfields;
- }
-
- public void setWithoutfields(String[] withoutfields) {
- this.withoutfields = withoutfields;
+ public ElasticsearchPersistReader(ElasticsearchReaderConfiguration config) {
+ this.config = config;
}
//PersistReader methods
@@ -128,57 +48,14 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl
public void startStream() {
LOGGER.debug("startStream");
executor = Executors.newSingleThreadExecutor();
- executor.submit(new ElasticsearchPersistReaderTask(this));
+ executor.submit(new ElasticsearchPersistReaderTask(this, elasticsearchQuery));
}
@Override
public void prepare(Object o) {
-
- mapper = StreamsJacksonMapper.getInstance();
- persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
-
- // If we haven't already set up the search, then set up the search.
- if (search == null) {
- search = elasticsearchClientManager.getClient()
- .prepareSearch(indexes.toArray(new String[0]))
- .setSearchType(SearchType.SCAN)
- .setSize(Objects.firstNonNull(batchSize, DEFAULT_BATCH_SIZE).intValue())
- .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT));
-
- if (this.queryBuilder != null)
- search.setQuery(this.queryBuilder);
-
- // If the types are null, then don't specify a type
- if (this.types != null && this.types.size() > 0)
- search = search.setTypes(types.toArray(new String[0]));
-
- Integer clauses = 0;
- if (this.withfields != null || this.withoutfields != null) {
- if (this.withfields != null)
- clauses += this.withfields.length;
- if (this.withoutfields != null)
- clauses += this.withoutfields.length;
- }
-
- List<FilterBuilder> filterList = buildFilterList();
-
- FilterBuilder allFilters = andFilters(filterList);
-
- if (clauses > 0) {
- // search.setPostFilter(allFilters);
- search.setPostFilter(allFilters);
- }
-
- // TODO: Replace when all clusters are upgraded past 0.90.4 so we can implement a RANDOM scroll.
- if (this.random)
- search = search.addSort(SortBuilders.scriptSort("random()", "number"));
- }
-
- // We don't have a scroll, we need to create a scroll
- if (scrollResp == null) {
- scrollResp = search.execute().actionGet();
- LOGGER.trace(search.toString());
- }
+ elasticsearchQuery = this.config == null ? new ElasticsearchQuery() : new ElasticsearchQuery(config);
+ elasticsearchQuery.execute(o);
+ persistQueue = constructQueue();
}
@Override
@@ -192,12 +69,12 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl
StreamsResultSet current;
synchronized (ElasticsearchPersistReader.class) {
- current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+ current = new StreamsResultSet(persistQueue);
current.setCounter(new DatumStatusCounter());
// current.getCounter().add(countersCurrent);
// countersTotal.add(countersCurrent);
// countersCurrent = new DatumStatusCounter();
- persistQueue.clear();
+ persistQueue = constructQueue();
}
return current;
@@ -218,69 +95,20 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl
@Override
public void cleanUp() {
+ this.shutdownAndAwaitTermination(executor);
LOGGER.info("PersistReader done");
}
- //Iterable methods
- @Override
- public Iterator<SearchHit> iterator() {
- return this;
- }
-
- //Iterator methods
- @Override
- public SearchHit next() {
- return this.next;
- }
-
- @Override
- public boolean hasNext() {
- calcNext();
- return hasRecords();
- }
-
-
- public void calcNext() {
- try {
- // We have exhausted our scroll create another scroll.
- if (scrollPositionInScroll == SCROLL_POSITION_NOT_INITIALIZED || scrollPositionInScroll >= scrollResp.getHits().getHits().length) {
- // reset the scroll position
- scrollPositionInScroll = 0;
-
- // get the next hits of the scroll
- scrollResp = elasticsearchClientManager.getClient()
- .prepareSearchScroll(scrollResp.getScrollId())
- .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT))
- .execute()
- .actionGet();
-
- this.totalHits = scrollResp.getHits().getTotalHits();
- }
-
- // If this scroll has 0 items then we set the scroll position to -1
- // letting the iterator know that we are done.
- if (scrollResp.getHits().getTotalHits() == 0 || scrollResp.getHits().getHits().length == 0)
- scrollPositionInScroll = -1;
- else {
- // get the next record
- next = scrollResp.getHits().getAt(scrollPositionInScroll);
-
- // Increment our counters
- scrollPositionInScroll += 1;
- totalRead += 1;
- }
- } catch (Exception e) {
- e.printStackTrace();
- LOGGER.error("Unexpected scrolling error: {}", e.getMessage());
- scrollPositionInScroll = -1;
- next = null;
+ protected void write(StreamsDatum entry) {
+ boolean success;
+ do {
+ success = persistQueue.offer(entry);
+ Thread.yield();
}
+ while (!success);
}
- public void remove() {
- }
-
- void shutdownAndAwaitTermination(ExecutorService pool) {
+ protected void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
@@ -288,7 +116,7 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(10, TimeUnit.SECONDS))
- System.err.println("Pool did not terminate");
+ LOGGER.error("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
@@ -298,72 +126,50 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl
}
}
- private boolean isCompleted() {
- return totalRead >= this.limit && hasRecords();
- }
-
- private boolean hasRecords() {
- return scrollPositionInScroll != -1 && (!(this.totalRead > this.limit));
- }
-
- // copied from elasticsearch
- // if we need this again we should factor it out into a utility
- private FilterBuilder andFilters(List<FilterBuilder> filters) {
- if (filters == null || filters.size() == 0)
- return null;
-
- FilterBuilder toReturn = filters.get(0);
-
- for (int i = 1; i < filters.size(); i++)
- toReturn = FilterBuilders.andFilter(toReturn, filters.get(i));
-
- return toReturn;
+ private Queue<StreamsDatum> constructQueue() {
+ return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
}
- private FilterBuilder orFilters(List<FilterBuilder> filters) {
- if (filters == null || filters.size() == 0)
- return null;
+ public static class ElasticsearchPersistReaderTask implements Runnable {
- FilterBuilder toReturn = filters.get(0);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistReaderTask.class);
- for (int i = 1; i < filters.size(); i++)
- toReturn = FilterBuilders.orFilter(toReturn, filters.get(i));
+ private ElasticsearchPersistReader reader;
+ private ElasticsearchQuery query;
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
- return toReturn;
- }
-
- private List<FilterBuilder> buildFilterList() {
-
- ArrayList<FilterBuilder> filterList = Lists.newArrayList();
+ public ElasticsearchPersistReaderTask(ElasticsearchPersistReader reader, ElasticsearchQuery query) {
+ this.reader = reader;
+ this.query = query;
+ }
- // If any withfields are specified, require that field be present
- // There must a value set also for the document to be processed
- if (this.withfields != null && this.withfields.length > 0) {
- ArrayList<FilterBuilder> withFilterList = Lists.newArrayList();
- for (String withfield : this.withfields) {
- FilterBuilder withFilter = FilterBuilders.existsFilter(withfield);
- withFilterList.add(withFilter);
+ @Override
+ public void run() {
+
+ StreamsDatum item;
+ while (query.hasNext()) {
+ SearchHit hit = query.next();
+ ObjectNode jsonObject = null;
+ try {
+ jsonObject = mapper.readValue(hit.getSourceAsString(), ObjectNode.class);
+ } catch (IOException e) {
+ e.printStackTrace();
+ break;
+ }
+ item = new StreamsDatum(jsonObject, hit.getId());
+ item.getMetadata().put("id", hit.getId());
+ item.getMetadata().put("index", hit.getIndex());
+ item.getMetadata().put("type", hit.getType());
+ reader.write(item);
}
- //filterList.add(FilterBuilders.orFilter(orFilters(withFilterList)));
- filterList.add(withFilterList.get(0));
- }
- // If any withoutfields are specified, require that field not be present
- // Document will be picked up even if present, if they do not have at least one value
- // this is annoying as it majorly impacts runtime
- // might be able to change behavior using null_field
- if (this.withoutfields != null && this.withoutfields.length > 0) {
- ArrayList<FilterBuilder> withoutFilterList = Lists.newArrayList();
- for (String withoutfield : this.withoutfields) {
- FilterBuilder withoutFilter = FilterBuilders.missingFilter(withoutfield).existence(true).nullValue(false);
- withoutFilterList.add(withoutFilter);
+ try {
+ Thread.sleep(new Random().nextInt(100));
+ } catch (InterruptedException e) {
+ LOGGER.warn("Thread interrupted", e);
}
- //filterList.add(FilterBuilders.orFilter(orFilters(withoutFilterList)));
- filterList.add(withoutFilterList.get(0));
- }
- return filterList;
+ }
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b5fd7e70/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
deleted file mode 100644
index 2d9c951..0000000
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package org.apache.streams.elasticsearch;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.ComponentUtils;
-import org.elasticsearch.search.SearchHit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Queue;
-import java.util.Random;
-
-public class ElasticsearchPersistReaderTask implements Runnable {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistReaderTask.class);
-
- private ElasticsearchPersistReader reader;
-
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
- public ElasticsearchPersistReaderTask(ElasticsearchPersistReader reader) {
- this.reader = reader;
- }
-
- @Override
- public void run() {
-
- StreamsDatum item;
- while( reader.hasNext()) {
- SearchHit hit = reader.next();
- ObjectNode jsonObject = null;
- try {
- jsonObject = mapper.readValue(hit.getSourceAsString(), ObjectNode.class);
- } catch (IOException e) {
- e.printStackTrace();
- break;
- }
- item = new StreamsDatum(jsonObject, hit.getId());
- item.getMetadata().put("id", hit.getId());
- item.getMetadata().put("index", hit.getIndex());
- item.getMetadata().put("type", hit.getType());
- write(item);
- }
- try {
- Thread.sleep(new Random().nextInt(100));
- } catch (InterruptedException e) {}
-
- }
-
- private void write( StreamsDatum entry ) {
- boolean success;
- do {
- synchronized( ElasticsearchPersistReader.class ) {
- success = reader.persistQueue.offer(entry);
- }
- Thread.yield();
- }
- while( !success );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b5fd7e70/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
new file mode 100644
index 0000000..8c9abda
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -0,0 +1,282 @@
+package org.apache.streams.elasticsearch;
+
+import com.google.common.collect.Lists;
+import com.google.common.base.Objects;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchType;
+import org.elasticsearch.index.query.FilterBuilder;
+import org.elasticsearch.index.query.FilterBuilders;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.sort.SortBuilders;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchHit>, Serializable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchQuery.class);
+ private static final int SCROLL_POSITION_NOT_INITIALIZED = -3;
+ private static final Integer DEFAULT_BATCH_SIZE = 500;
+ private static final String DEFAULT_SCROLL_TIMEOUT = "5m";
+
+ private ElasticsearchClientManager elasticsearchClientManager;
+ private ElasticsearchConfiguration config;
+ private List<String> indexes = Lists.newArrayList();
+ private List<String> types = Lists.newArrayList();
+ private String[] withfields;
+ private String[] withoutfields;
+ private DateTime startDate;
+ private DateTime endDate;
+ private int limit = 1000 * 1000 * 1000; // we are going to set the default limit very high to 1bil
+ private boolean random = false;
+ private int batchSize = 100;
+ private String scrollTimeout = null;
+ private org.elasticsearch.index.query.QueryBuilder queryBuilder;
+ private org.elasticsearch.index.query.FilterBuilder filterBuilder;// These are private to help us manage the scroll
+ private SearchRequestBuilder search;
+ private SearchResponse scrollResp;
+ private int scrollPositionInScroll = SCROLL_POSITION_NOT_INITIALIZED;
+ private SearchHit next = null;
+ private long totalHits = 0;
+ private long totalRead = 0;
+
+ public ElasticsearchQuery() {
+ Config config = StreamsConfigurator.config.getConfig("elasticsearch");
+ this.config = ElasticsearchConfigurator.detectConfiguration(config);
+ }
+
+ public ElasticsearchQuery(ElasticsearchReaderConfiguration config) {
+ this.config = config;
+ this.elasticsearchClientManager = new ElasticsearchClientManager(config);
+ this.indexes.addAll(config.getIndexes());
+ this.types.addAll(config.getTypes());
+ }
+
+ public long getHitCount() {
+ return this.search == null ? 0 : this.totalHits;
+ }
+
+ public long getReadCount() {
+ return this.totalRead;
+ }
+
+ public double getReadPercent() {
+ return (double) this.getReadCount() / (double) this.getHitCount();
+ }
+
+ public long getRemainingCount() {
+ return this.totalRead - this.totalHits;
+ }
+
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ public void setScrollTimeout(String scrollTimeout) {
+ this.scrollTimeout = scrollTimeout;
+ }
+
+ public void setQueryBuilder(QueryBuilder queryBuilder) {
+ this.queryBuilder = queryBuilder;
+ }
+
+ public void setFilterBuilder(FilterBuilder filterBuilder) {
+ this.filterBuilder = filterBuilder;
+ }
+
+ public void setWithfields(String[] withfields) {
+ this.withfields = withfields;
+ }
+
+ public void setWithoutfields(String[] withoutfields) {
+ this.withoutfields = withoutfields;
+ }
+
+ public void execute(Object o) {
+
+ // If we haven't already set up the search, then set up the search.
+ if (search == null) {
+ search = elasticsearchClientManager.getClient()
+ .prepareSearch(indexes.toArray(new String[0]))
+ .setSearchType(SearchType.SCAN)
+ .setSize(Objects.firstNonNull(batchSize, DEFAULT_BATCH_SIZE).intValue())
+ .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT));
+
+ if (this.queryBuilder != null)
+ search.setQuery(this.queryBuilder);
+
+ // If the types are null, then don't specify a type
+ if (this.types != null && this.types.size() > 0)
+ search = search.setTypes(types.toArray(new String[0]));
+
+ Integer clauses = 0;
+ if (this.withfields != null || this.withoutfields != null) {
+ if (this.withfields != null)
+ clauses += this.withfields.length;
+ if (this.withoutfields != null)
+ clauses += this.withoutfields.length;
+ }
+
+ List<FilterBuilder> filterList = buildFilterList();
+
+ FilterBuilder allFilters = andFilters(filterList);
+
+ if (clauses > 0) {
+ // search.setPostFilter(allFilters);
+ search.setPostFilter(allFilters);
+ }
+
+ // TODO: Replace when all clusters are upgraded past 0.90.4 so we can implement a RANDOM scroll.
+ if (this.random)
+ search = search.addSort(SortBuilders.scriptSort("random()", "number"));
+ }
+
+ // We don't have a scroll, we need to create a scroll
+ if (scrollResp == null) {
+ scrollResp = search.execute().actionGet();
+ LOGGER.trace(search.toString());
+ }
+ }
+
+ //Iterable methods
+ @Override
+ public Iterator<SearchHit> iterator() {
+ return this;
+ }
+
+ //Iterator methods
+ @Override
+ public SearchHit next() {
+ return this.next;
+ }
+
+ @Override
+ public boolean hasNext() {
+ calcNext();
+ return hasRecords();
+ }
+
+ public void calcNext() {
+ try {
+ // We have exhausted our scroll create another scroll.
+ if (scrollPositionInScroll == SCROLL_POSITION_NOT_INITIALIZED || scrollPositionInScroll >= scrollResp.getHits().getHits().length) {
+ // reset the scroll position
+ scrollPositionInScroll = 0;
+
+ // get the next hits of the scroll
+ scrollResp = elasticsearchClientManager.getClient()
+ .prepareSearchScroll(scrollResp.getScrollId())
+ .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT))
+ .execute()
+ .actionGet();
+
+ this.totalHits = scrollResp.getHits().getTotalHits();
+ }
+
+ // If this scroll has 0 items then we set the scroll position to -1
+ // letting the iterator know that we are done.
+ if (scrollResp.getHits().getTotalHits() == 0 || scrollResp.getHits().getHits().length == 0)
+ scrollPositionInScroll = -1;
+ else {
+ // get the next record
+ next = scrollResp.getHits().getAt(scrollPositionInScroll);
+
+ // Increment our counters
+ scrollPositionInScroll += 1;
+ totalRead += 1;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.error("Unexpected scrolling error: {}", e.getMessage());
+ scrollPositionInScroll = -1;
+ next = null;
+ }
+ }
+
+ public void remove() {
+ }
+
+ protected boolean isCompleted() {
+ return totalRead >= this.limit && hasRecords();
+ }
+
+ protected boolean hasRecords() {
+ return scrollPositionInScroll != -1 && (!(this.totalRead > this.limit));
+ }
+
+ // copied from elasticsearch
+ // if we need this again we should factor it out into a utility
+ private FilterBuilder andFilters(List<FilterBuilder> filters) {
+ if (filters == null || filters.size() == 0)
+ return null;
+
+ FilterBuilder toReturn = filters.get(0);
+
+ for (int i = 1; i < filters.size(); i++)
+ toReturn = FilterBuilders.andFilter(toReturn, filters.get(i));
+
+ return toReturn;
+ }
+
+ private FilterBuilder orFilters(List<FilterBuilder> filters) {
+ if (filters == null || filters.size() == 0)
+ return null;
+
+ FilterBuilder toReturn = filters.get(0);
+
+ for (int i = 1; i < filters.size(); i++)
+ toReturn = FilterBuilders.orFilter(toReturn, filters.get(i));
+
+ return toReturn;
+ }
+
+ private List<FilterBuilder> buildFilterList() {
+
+ // If any withfields are specified, require that field be present
+ // There must a value set also for the document to be processed
+ // If any withoutfields are specified, require that field not be present
+ // Document will be picked up even if present, if they do not have at least one value
+ // this is annoying as it majorly impacts runtime
+ // might be able to change behavior using null_field
+
+ ArrayList<FilterBuilder> filterList = Lists.newArrayList();
+
+ // If any withfields are specified, require that field be present
+ // There must a value set also for the document to be processed
+ if (this.withfields != null && this.withfields.length > 0) {
+ ArrayList<FilterBuilder> withFilterList = Lists.newArrayList();
+ for (String withfield : this.withfields) {
+ FilterBuilder withFilter = FilterBuilders.existsFilter(withfield);
+ withFilterList.add(withFilter);
+ }
+ //filterList.add(FilterBuilders.orFilter(orFilters(withFilterList)));
+ filterList.add(withFilterList.get(0));
+ }
+ // If any withoutfields are specified, require that field not be present
+ // Document will be picked up even if present, if they do not have at least one value
+ // this is annoying as it majorly impacts runtime
+ // might be able to change behavior using null_field
+ if (this.withoutfields != null && this.withoutfields.length > 0) {
+ ArrayList<FilterBuilder> withoutFilterList = Lists.newArrayList();
+ for (String withoutfield : this.withoutfields) {
+ FilterBuilder withoutFilter = FilterBuilders.missingFilter(withoutfield).existence(true).nullValue(false);
+ withoutFilterList.add(withoutFilter);
+ }
+ //filterList.add(FilterBuilders.orFilter(orFilters(withoutFilterList)));
+ filterList.add(withoutFilterList.get(0));
+ }
+
+ return filterList;
+ }
+}
\ No newline at end of file