You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/04/22 19:13:04 UTC

[2/3] git commit: Refactored Elasticsearh components into their own class for better cohesion

Refactored Elasticsearh components into their own class for better cohesion


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b5fd7e70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b5fd7e70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b5fd7e70

Branch: refs/heads/STREAMS-58
Commit: b5fd7e70aa81515c4278cbaec788fbf3f81b8d58
Parents: af1efe8
Author: mfranklin <mf...@apache.org>
Authored: Tue Apr 22 11:34:39 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Tue Apr 22 11:34:39 2014 -0400

----------------------------------------------------------------------
 .../ElasticsearchPersistReader.java             | 308 ++++---------------
 .../ElasticsearchPersistReaderTask.java         |  63 ----
 .../elasticsearch/ElasticsearchQuery.java       | 282 +++++++++++++++++
 3 files changed, 339 insertions(+), 314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b5fd7e70/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index 72a9954..fd2a155 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -1,25 +1,17 @@
 package org.apache.streams.elasticsearch;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Queues;
-import com.typesafe.config.Config;
-import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.*;
 import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.search.SearchType;
-import org.elasticsearch.index.query.FilterBuilder;
-import org.elasticsearch.index.query.FilterBuilders;
-import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.sort.SortBuilders;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.*;
 import java.util.concurrent.*;
@@ -32,95 +24,23 @@ import java.util.concurrent.*;
  * ************************************************************************************************************
  */
 
-public class ElasticsearchPersistReader implements StreamsPersistReader, Iterable<SearchHit>, Iterator<SearchHit> {
+public class ElasticsearchPersistReader implements StreamsPersistReader, Serializable {
     public static final String STREAMS_ID = "ElasticsearchPersistReader";
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistReader.class);
-    private static final int SCROLL_POSITION_NOT_INITIALIZED = -3;
-    private static final Integer DEFAULT_BATCH_SIZE = 500;
-    private static final String DEFAULT_SCROLL_TIMEOUT = "5m";
 
     protected volatile Queue<StreamsDatum> persistQueue;
 
-    private ElasticsearchClientManager elasticsearchClientManager;
-    private List<String> indexes = Lists.newArrayList();
-    private List<String> types = Lists.newArrayList();
-    private String[] withfields;
-    private String[] withoutfields;
-    private DateTime startDate;
-    private DateTime endDate;
-    private int limit = 1000 * 1000 * 1000; // we are going to set the default limit very high to 1bil
-    private boolean random = false;
+    private ElasticsearchQuery elasticsearchQuery;
+    private ElasticsearchReaderConfiguration config;
     private int threadPoolSize = 10;
-    private int batchSize = 100;
-    private String scrollTimeout = null;
-
-    private ObjectMapper mapper;
-
-    private ElasticsearchConfiguration config;
-
     private ExecutorService executor;
 
-    private QueryBuilder queryBuilder;
-    private FilterBuilder filterBuilder;
-
-    // These are private to help us manage the scroll
-    private SearchRequestBuilder search;
-    private SearchResponse scrollResp;
-    private int scrollPositionInScroll = SCROLL_POSITION_NOT_INITIALIZED;
-    private SearchHit next = null;
-    private long totalHits = 0;
-    private long totalRead = 0;
-
     public ElasticsearchPersistReader() {
-        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
-        this.config = ElasticsearchConfigurator.detectConfiguration(config);
-    }
-
-    public ElasticsearchPersistReader(ElasticsearchReaderConfiguration elasticsearchConfiguration) {
-        this.elasticsearchClientManager = new ElasticsearchClientManager(elasticsearchConfiguration);
-        indexes.addAll(elasticsearchConfiguration.getIndexes());
-        types.addAll(elasticsearchConfiguration.getTypes());
-    }
-
-    public long getHitCount() {
-        return this.search == null ? 0 : this.totalHits;
-    }
-
-    public long getReadCount() {
-        return this.totalRead;
-    }
-
-    public double getReadPercent() {
-        return (double) this.getReadCount() / (double) this.getHitCount();
-    }
-
-    public long getRemainingCount() {
-        return this.totalRead - this.totalHits;
-    }
-
-    public void setBatchSize(int batchSize) {
-        this.batchSize = batchSize;
     }
 
-    public void setScrollTimeout(String scrollTimeout) {
-        this.scrollTimeout = scrollTimeout;
-    }
-
-    public void setQueryBuilder(QueryBuilder queryBuilder) {
-        this.queryBuilder = queryBuilder;
-    }
-
-    public void setFilterBuilder(FilterBuilder filterBuilder) {
-        this.filterBuilder = filterBuilder;
-    }
-
-    public void setWithfields(String[] withfields) {
-        this.withfields = withfields;
-    }
-
-    public void setWithoutfields(String[] withoutfields) {
-        this.withoutfields = withoutfields;
+    public ElasticsearchPersistReader(ElasticsearchReaderConfiguration config) {
+        this.config = config;
     }
 
     //PersistReader methods
@@ -128,57 +48,14 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl
     public void startStream() {
         LOGGER.debug("startStream");
         executor = Executors.newSingleThreadExecutor();
-        executor.submit(new ElasticsearchPersistReaderTask(this));
+        executor.submit(new ElasticsearchPersistReaderTask(this, elasticsearchQuery));
     }
 
     @Override
     public void prepare(Object o) {
-
-        mapper = StreamsJacksonMapper.getInstance();
-        persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
-
-        // If we haven't already set up the search, then set up the search.
-        if (search == null) {
-            search = elasticsearchClientManager.getClient()
-                    .prepareSearch(indexes.toArray(new String[0]))
-                    .setSearchType(SearchType.SCAN)
-                    .setSize(Objects.firstNonNull(batchSize, DEFAULT_BATCH_SIZE).intValue())
-                    .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT));
-
-            if (this.queryBuilder != null)
-                search.setQuery(this.queryBuilder);
-
-            // If the types are null, then don't specify a type
-            if (this.types != null && this.types.size() > 0)
-                search = search.setTypes(types.toArray(new String[0]));
-
-            Integer clauses = 0;
-            if (this.withfields != null || this.withoutfields != null) {
-                if (this.withfields != null)
-                    clauses += this.withfields.length;
-                if (this.withoutfields != null)
-                    clauses += this.withoutfields.length;
-            }
-
-            List<FilterBuilder> filterList = buildFilterList();
-
-            FilterBuilder allFilters = andFilters(filterList);
-
-            if (clauses > 0) {
-                //    search.setPostFilter(allFilters);
-                search.setPostFilter(allFilters);
-            }
-
-            // TODO: Replace when all clusters are upgraded past 0.90.4 so we can implement a RANDOM scroll.
-            if (this.random)
-                search = search.addSort(SortBuilders.scriptSort("random()", "number"));
-        }
-
-        // We don't have a scroll, we need to create a scroll
-        if (scrollResp == null) {
-            scrollResp = search.execute().actionGet();
-            LOGGER.trace(search.toString());
-        }
+        elasticsearchQuery = this.config == null ? new ElasticsearchQuery() : new ElasticsearchQuery(config);
+        elasticsearchQuery.execute(o);
+        persistQueue = constructQueue();
     }
 
     @Override
@@ -192,12 +69,12 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl
         StreamsResultSet current;
 
         synchronized (ElasticsearchPersistReader.class) {
-            current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+            current = new StreamsResultSet(persistQueue);
             current.setCounter(new DatumStatusCounter());
 //            current.getCounter().add(countersCurrent);
 //            countersTotal.add(countersCurrent);
 //            countersCurrent = new DatumStatusCounter();
-            persistQueue.clear();
+            persistQueue = constructQueue();
         }
 
         return current;
@@ -218,69 +95,20 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl
 
     @Override
     public void cleanUp() {
+        this.shutdownAndAwaitTermination(executor);
         LOGGER.info("PersistReader done");
     }
 
-    //Iterable methods
-    @Override
-    public Iterator<SearchHit> iterator() {
-        return this;
-    }
-
-    //Iterator methods
-    @Override
-    public SearchHit next() {
-        return this.next;
-    }
-
-    @Override
-    public boolean hasNext() {
-        calcNext();
-        return hasRecords();
-    }
-
-
-    public void calcNext() {
-        try {
-            // We have exhausted our scroll create another scroll.
-            if (scrollPositionInScroll == SCROLL_POSITION_NOT_INITIALIZED || scrollPositionInScroll >= scrollResp.getHits().getHits().length) {
-                // reset the scroll position
-                scrollPositionInScroll = 0;
-
-                // get the next hits of the scroll
-                scrollResp = elasticsearchClientManager.getClient()
-                        .prepareSearchScroll(scrollResp.getScrollId())
-                        .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT))
-                        .execute()
-                        .actionGet();
-
-                this.totalHits = scrollResp.getHits().getTotalHits();
-            }
-
-            // If this scroll has 0 items then we set the scroll position to -1
-            // letting the iterator know that we are done.
-            if (scrollResp.getHits().getTotalHits() == 0 || scrollResp.getHits().getHits().length == 0)
-                scrollPositionInScroll = -1;
-            else {
-                // get the next record
-                next = scrollResp.getHits().getAt(scrollPositionInScroll);
-
-                // Increment our counters
-                scrollPositionInScroll += 1;
-                totalRead += 1;
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.error("Unexpected scrolling error: {}", e.getMessage());
-            scrollPositionInScroll = -1;
-            next = null;
+    protected void write(StreamsDatum entry) {
+        boolean success;
+        do {
+            success = persistQueue.offer(entry);
+            Thread.yield();
         }
+        while (!success);
     }
 
-    public void remove() {
-    }
-
-    void shutdownAndAwaitTermination(ExecutorService pool) {
+    protected void shutdownAndAwaitTermination(ExecutorService pool) {
         pool.shutdown(); // Disable new tasks from being submitted
         try {
             // Wait a while for existing tasks to terminate
@@ -288,7 +116,7 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl
                 pool.shutdownNow(); // Cancel currently executing tasks
                 // Wait a while for tasks to respond to being cancelled
                 if (!pool.awaitTermination(10, TimeUnit.SECONDS))
-                    System.err.println("Pool did not terminate");
+                    LOGGER.error("Pool did not terminate");
             }
         } catch (InterruptedException ie) {
             // (Re-)Cancel if current thread also interrupted
@@ -298,72 +126,50 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Iterabl
         }
     }
 
-    private boolean isCompleted() {
-        return totalRead >= this.limit && hasRecords();
-    }
-
-    private boolean hasRecords() {
-        return scrollPositionInScroll != -1 && (!(this.totalRead > this.limit));
-    }
-
-    // copied from elasticsearch
-    // if we need this again we should factor it out into a utility
-    private FilterBuilder andFilters(List<FilterBuilder> filters) {
-        if (filters == null || filters.size() == 0)
-            return null;
-
-        FilterBuilder toReturn = filters.get(0);
-
-        for (int i = 1; i < filters.size(); i++)
-            toReturn = FilterBuilders.andFilter(toReturn, filters.get(i));
-
-        return toReturn;
+    private Queue<StreamsDatum> constructQueue() {
+        return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
     }
 
-    private FilterBuilder orFilters(List<FilterBuilder> filters) {
-        if (filters == null || filters.size() == 0)
-            return null;
+    public static class ElasticsearchPersistReaderTask implements Runnable {
 
-        FilterBuilder toReturn = filters.get(0);
+        private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistReaderTask.class);
 
-        for (int i = 1; i < filters.size(); i++)
-            toReturn = FilterBuilders.orFilter(toReturn, filters.get(i));
+        private ElasticsearchPersistReader reader;
+        private ElasticsearchQuery query;
+        private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-        return toReturn;
-    }
-
-    private List<FilterBuilder> buildFilterList() {
-
-        ArrayList<FilterBuilder> filterList = Lists.newArrayList();
+        public ElasticsearchPersistReaderTask(ElasticsearchPersistReader reader, ElasticsearchQuery query) {
+            this.reader = reader;
+            this.query = query;
+        }
 
-        // If any withfields are specified, require that field be present
-        //    There must a value set also for the document to be processed
-        if (this.withfields != null && this.withfields.length > 0) {
-            ArrayList<FilterBuilder> withFilterList = Lists.newArrayList();
-            for (String withfield : this.withfields) {
-                FilterBuilder withFilter = FilterBuilders.existsFilter(withfield);
-                withFilterList.add(withFilter);
+        @Override
+        public void run() {
+
+            StreamsDatum item;
+            while (query.hasNext()) {
+                SearchHit hit = query.next();
+                ObjectNode jsonObject = null;
+                try {
+                    jsonObject = mapper.readValue(hit.getSourceAsString(), ObjectNode.class);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    break;
+                }
+                item = new StreamsDatum(jsonObject, hit.getId());
+                item.getMetadata().put("id", hit.getId());
+                item.getMetadata().put("index", hit.getIndex());
+                item.getMetadata().put("type", hit.getType());
+                reader.write(item);
             }
-            //filterList.add(FilterBuilders.orFilter(orFilters(withFilterList)));
-            filterList.add(withFilterList.get(0));
-        }
-        // If any withoutfields are specified, require that field not be present
-        //    Document will be picked up even if present, if they do not have at least one value
-        // this is annoying as it majorly impacts runtime
-        // might be able to change behavior using null_field
-        if (this.withoutfields != null && this.withoutfields.length > 0) {
-            ArrayList<FilterBuilder> withoutFilterList = Lists.newArrayList();
-            for (String withoutfield : this.withoutfields) {
-                FilterBuilder withoutFilter = FilterBuilders.missingFilter(withoutfield).existence(true).nullValue(false);
-                withoutFilterList.add(withoutFilter);
+            try {
+                Thread.sleep(new Random().nextInt(100));
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread interrupted", e);
             }
-            //filterList.add(FilterBuilders.orFilter(orFilters(withoutFilterList)));
-            filterList.add(withoutFilterList.get(0));
-        }
 
-        return filterList;
+        }
     }
-
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b5fd7e70/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
deleted file mode 100644
index 2d9c951..0000000
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package org.apache.streams.elasticsearch;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.ComponentUtils;
-import org.elasticsearch.search.SearchHit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Queue;
-import java.util.Random;
-
-public class ElasticsearchPersistReaderTask implements Runnable {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistReaderTask.class);
-
-    private ElasticsearchPersistReader reader;
-
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-    public ElasticsearchPersistReaderTask(ElasticsearchPersistReader reader) {
-        this.reader = reader;
-    }
-
-    @Override
-    public void run() {
-
-        StreamsDatum item;
-        while( reader.hasNext()) {
-            SearchHit hit = reader.next();
-            ObjectNode jsonObject = null;
-            try {
-                jsonObject = mapper.readValue(hit.getSourceAsString(), ObjectNode.class);
-            } catch (IOException e) {
-                e.printStackTrace();
-                break;
-            }
-            item = new StreamsDatum(jsonObject, hit.getId());
-            item.getMetadata().put("id", hit.getId());
-            item.getMetadata().put("index", hit.getIndex());
-            item.getMetadata().put("type", hit.getType());
-            write(item);
-        }
-        try {
-            Thread.sleep(new Random().nextInt(100));
-        } catch (InterruptedException e) {}
-
-    }
-
-    private void write( StreamsDatum entry ) {
-        boolean success;
-        do {
-            synchronized( ElasticsearchPersistReader.class ) {
-                success = reader.persistQueue.offer(entry);
-            }
-            Thread.yield();
-        }
-        while( !success );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b5fd7e70/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
new file mode 100644
index 0000000..8c9abda
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -0,0 +1,282 @@
+package org.apache.streams.elasticsearch;
+
+import com.google.common.collect.Lists;
+import com.google.common.base.Objects;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchType;
+import org.elasticsearch.index.query.FilterBuilder;
+import org.elasticsearch.index.query.FilterBuilders;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.sort.SortBuilders;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchHit>, Serializable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchQuery.class);
+    private static final int SCROLL_POSITION_NOT_INITIALIZED = -3;
+    private static final Integer DEFAULT_BATCH_SIZE = 500;
+    private static final String DEFAULT_SCROLL_TIMEOUT = "5m";
+
+    private ElasticsearchClientManager elasticsearchClientManager;
+    private ElasticsearchConfiguration config;
+    private List<String> indexes = Lists.newArrayList();
+    private List<String> types = Lists.newArrayList();
+    private String[] withfields;
+    private String[] withoutfields;
+    private DateTime startDate;
+    private DateTime endDate;
+    private int limit = 1000 * 1000 * 1000; // we are going to set the default limit very high to 1bil
+    private boolean random = false;
+    private int batchSize = 100;
+    private String scrollTimeout = null;
+    private org.elasticsearch.index.query.QueryBuilder queryBuilder;
+    private org.elasticsearch.index.query.FilterBuilder filterBuilder;// These are private to help us manage the scroll
+    private SearchRequestBuilder search;
+    private SearchResponse scrollResp;
+    private int scrollPositionInScroll = SCROLL_POSITION_NOT_INITIALIZED;
+    private SearchHit next = null;
+    private long totalHits = 0;
+    private long totalRead = 0;
+
+    public ElasticsearchQuery() {
+        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
+        this.config = ElasticsearchConfigurator.detectConfiguration(config);
+    }
+
+    public ElasticsearchQuery(ElasticsearchReaderConfiguration config) {
+        this.config = config;
+        this.elasticsearchClientManager = new ElasticsearchClientManager(config);
+        this.indexes.addAll(config.getIndexes());
+        this.types.addAll(config.getTypes());
+    }
+
+    public long getHitCount() {
+        return this.search == null ? 0 : this.totalHits;
+    }
+
+    public long getReadCount() {
+        return this.totalRead;
+    }
+
+    public double getReadPercent() {
+        return (double) this.getReadCount() / (double) this.getHitCount();
+    }
+
+    public long getRemainingCount() {
+        return this.totalRead - this.totalHits;
+    }
+
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    public void setScrollTimeout(String scrollTimeout) {
+        this.scrollTimeout = scrollTimeout;
+    }
+
+    public void setQueryBuilder(QueryBuilder queryBuilder) {
+        this.queryBuilder = queryBuilder;
+    }
+
+    public void setFilterBuilder(FilterBuilder filterBuilder) {
+        this.filterBuilder = filterBuilder;
+    }
+
+    public void setWithfields(String[] withfields) {
+        this.withfields = withfields;
+    }
+
+    public void setWithoutfields(String[] withoutfields) {
+        this.withoutfields = withoutfields;
+    }
+
+    public void execute(Object o) {
+
+        // If we haven't already set up the search, then set up the search.
+        if (search == null) {
+            search = elasticsearchClientManager.getClient()
+                    .prepareSearch(indexes.toArray(new String[0]))
+                    .setSearchType(SearchType.SCAN)
+                    .setSize(Objects.firstNonNull(batchSize, DEFAULT_BATCH_SIZE).intValue())
+                    .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT));
+
+            if (this.queryBuilder != null)
+                search.setQuery(this.queryBuilder);
+
+            // If the types are null, then don't specify a type
+            if (this.types != null && this.types.size() > 0)
+                search = search.setTypes(types.toArray(new String[0]));
+
+            Integer clauses = 0;
+            if (this.withfields != null || this.withoutfields != null) {
+                if (this.withfields != null)
+                    clauses += this.withfields.length;
+                if (this.withoutfields != null)
+                    clauses += this.withoutfields.length;
+            }
+
+            List<FilterBuilder> filterList = buildFilterList();
+
+            FilterBuilder allFilters = andFilters(filterList);
+
+            if (clauses > 0) {
+                //    search.setPostFilter(allFilters);
+                search.setPostFilter(allFilters);
+            }
+
+            // TODO: Replace when all clusters are upgraded past 0.90.4 so we can implement a RANDOM scroll.
+            if (this.random)
+                search = search.addSort(SortBuilders.scriptSort("random()", "number"));
+        }
+
+        // We don't have a scroll, we need to create a scroll
+        if (scrollResp == null) {
+            scrollResp = search.execute().actionGet();
+            LOGGER.trace(search.toString());
+        }
+    }
+
+    //Iterable methods
+    @Override
+    public Iterator<SearchHit> iterator() {
+        return this;
+    }
+
+    //Iterator methods
+    @Override
+    public SearchHit next() {
+        return this.next;
+    }
+
+    @Override
+    public boolean hasNext() {
+        calcNext();
+        return hasRecords();
+    }
+
+    public void calcNext() {
+        try {
+            // We have exhausted our scroll create another scroll.
+            if (scrollPositionInScroll == SCROLL_POSITION_NOT_INITIALIZED || scrollPositionInScroll >= scrollResp.getHits().getHits().length) {
+                // reset the scroll position
+                scrollPositionInScroll = 0;
+
+                // get the next hits of the scroll
+                scrollResp = elasticsearchClientManager.getClient()
+                        .prepareSearchScroll(scrollResp.getScrollId())
+                        .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT))
+                        .execute()
+                        .actionGet();
+
+                this.totalHits = scrollResp.getHits().getTotalHits();
+            }
+
+            // If this scroll has 0 items then we set the scroll position to -1
+            // letting the iterator know that we are done.
+            if (scrollResp.getHits().getTotalHits() == 0 || scrollResp.getHits().getHits().length == 0)
+                scrollPositionInScroll = -1;
+            else {
+                // get the next record
+                next = scrollResp.getHits().getAt(scrollPositionInScroll);
+
+                // Increment our counters
+                scrollPositionInScroll += 1;
+                totalRead += 1;
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.error("Unexpected scrolling error: {}", e.getMessage());
+            scrollPositionInScroll = -1;
+            next = null;
+        }
+    }
+
+    public void remove() {
+    }
+
+    protected boolean isCompleted() {
+        return totalRead >= this.limit && hasRecords();
+    }
+
+    protected boolean hasRecords() {
+        return scrollPositionInScroll != -1 && (!(this.totalRead > this.limit));
+    }
+
+    // copied from elasticsearch
+    // if we need this again we should factor it out into a utility
+    private FilterBuilder andFilters(List<FilterBuilder> filters) {
+        if (filters == null || filters.size() == 0)
+            return null;
+
+        FilterBuilder toReturn = filters.get(0);
+
+        for (int i = 1; i < filters.size(); i++)
+            toReturn = FilterBuilders.andFilter(toReturn, filters.get(i));
+
+        return toReturn;
+    }
+
+    private FilterBuilder orFilters(List<FilterBuilder> filters) {
+        if (filters == null || filters.size() == 0)
+            return null;
+
+        FilterBuilder toReturn = filters.get(0);
+
+        for (int i = 1; i < filters.size(); i++)
+            toReturn = FilterBuilders.orFilter(toReturn, filters.get(i));
+
+        return toReturn;
+    }
+
+    private List<FilterBuilder> buildFilterList() {
+
+        // If any withfields are specified, require that field be present
+        //    There must a value set also for the document to be processed
+        // If any withoutfields are specified, require that field not be present
+        //    Document will be picked up even if present, if they do not have at least one value
+        // this is annoying as it majorly impacts runtime
+        // might be able to change behavior using null_field
+
+        ArrayList<FilterBuilder> filterList = Lists.newArrayList();
+
+        // If any withfields are specified, require that field be present
+        //    There must a value set also for the document to be processed
+        if (this.withfields != null && this.withfields.length > 0) {
+            ArrayList<FilterBuilder> withFilterList = Lists.newArrayList();
+            for (String withfield : this.withfields) {
+                FilterBuilder withFilter = FilterBuilders.existsFilter(withfield);
+                withFilterList.add(withFilter);
+            }
+            //filterList.add(FilterBuilders.orFilter(orFilters(withFilterList)));
+            filterList.add(withFilterList.get(0));
+        }
+        // If any withoutfields are specified, require that field not be present
+        //    Document will be picked up even if present, if they do not have at least one value
+        // this is annoying as it majorly impacts runtime
+        // might be able to change behavior using null_field
+        if (this.withoutfields != null && this.withoutfields.length > 0) {
+            ArrayList<FilterBuilder> withoutFilterList = Lists.newArrayList();
+            for (String withoutfield : this.withoutfields) {
+                FilterBuilder withoutFilter = FilterBuilders.missingFilter(withoutfield).existence(true).nullValue(false);
+                withoutFilterList.add(withoutFilter);
+            }
+            //filterList.add(FilterBuilders.orFilter(orFilters(withoutFilterList)));
+            filterList.add(withoutFilterList.get(0));
+        }
+
+        return filterList;
+    }
+}
\ No newline at end of file