You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/08/04 21:03:51 UTC

[1/2] git commit: updater / deleter implementation

Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-112 [created] 0103a967b
  refs/heads/STREAMS-140 [created] 0be789cfd


updater / deleter implementation


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0be789cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0be789cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0be789cf

Branch: refs/heads/STREAMS-140
Commit: 0be789cfdb23652f9f07e394b885598887f91ee7
Parents: c2d59a2
Author: sblackmon <sb...@w2odigital.com>
Authored: Mon Aug 4 13:30:36 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon Aug 4 13:30:36 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchPersistDeleter.java            | 102 +++++
 .../ElasticsearchPersistUpdater.java            | 413 ++-----------------
 .../ElasticsearchPersistWriter.java             |  29 +-
 3 files changed, 146 insertions(+), 398 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0be789cf/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
new file mode 100644
index 0000000..fece72e
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.elasticsearch;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter implements StreamsPersistWriter {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistDeleter.class);
+
+    public ElasticsearchPersistDeleter() {
+        super();
+    }
+
+    public ElasticsearchPersistDeleter(ElasticsearchWriterConfiguration config) {
+        super(config);
+    }
+
+    @Override
+    public void write(StreamsDatum streamsDatum) {
+
+        Preconditions.checkNotNull(streamsDatum);
+        Preconditions.checkNotNull(streamsDatum.getDocument());
+        Preconditions.checkNotNull(streamsDatum.getMetadata());
+        Preconditions.checkNotNull(streamsDatum.getMetadata().get("id"));
+
+        String index;
+        String type;
+        String id;
+
+        index = Optional.fromNullable(
+                (String) streamsDatum.getMetadata().get("index"))
+                .or(config.getIndex());
+        type = Optional.fromNullable(
+                (String) streamsDatum.getMetadata().get("type"))
+                .or(config.getType());
+        id = (String) streamsDatum.getMetadata().get("id");
+
+        delete(index, type, id);
+
+    }
+
+    public void delete(String index, String type, String id) {
+        DeleteRequest deleteRequest;
+
+        Preconditions.checkNotNull(index);
+        Preconditions.checkNotNull(id);
+        Preconditions.checkNotNull(type);
+
+        // They didn't specify an ID, so we will create one for them.
+        deleteRequest = new DeleteRequest()
+                .index(index)
+                .type(type)
+                .id(id);
+
+        add(deleteRequest);
+
+    }
+
+    public void add(DeleteRequest request) {
+
+        Preconditions.checkNotNull(request);
+        Preconditions.checkNotNull(request.index());
+
+        // If our queue is larger than our flush threshold, then we should flush the queue.
+        synchronized (this) {
+            checkIndexImplications(request.index());
+
+            bulkRequest.add(request);
+
+            currentBatchItems.incrementAndGet();
+
+            checkForFlush();
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0be789cf/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index b2e7556..dbf7d25 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -56,96 +56,16 @@ import java.util.*;
 
 //import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
 
-public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable {
-    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistUpdater.class);
-    private final static NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
-    private final static NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
-
-    protected ElasticsearchClientManager manager;
-    protected Client client;
-    private String parentID = null;
-    protected BulkRequestBuilder bulkRequest;
-    private OutputStreamWriter currentWriter = null;
-
-    protected String index = null;
-    protected String type = null;
-    private int batchSize = 50;
-    private int totalRecordsWritten = 0;
-    private boolean veryLargeBulk = false;  // by default this setting is set to false
-
-    private final static Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
-    private static final long WAITING_DOCS_LIMIT = 10000;
-
-    public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
-
-    private volatile int totalSent = 0;
-    private volatile int totalSeconds = 0;
-    private volatile int totalOk = 0;
-    private volatile int totalFailed = 0;
-    private volatile int totalBatchCount = 0;
-    private volatile long totalSizeInBytes = 0;
-
-    private volatile long batchSizeInBytes = 0;
-    private volatile int batchItemsSent = 0;
-
-    public void setIndex(String index) {
-        this.index = index;
-    }
-
-    public void setType(String type) {
-        this.type = type;
-    }
-
-    public void setBatchSize(int batchSize) {
-        this.batchSize = batchSize;
-    }
-
-    private final List<String> affectedIndexes = new ArrayList<String>();
+public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter implements StreamsPersistWriter {
 
-    public long getFlushThresholdSizeInBytes() {
-        return flushThresholdSizeInBytes;
-    }
-
-    public int getTotalBatchCount() {
-        return totalBatchCount;
-    }
-
-    public long getBatchSizeInBytes() {
-        return batchSizeInBytes;
-    }
-
-    public int getBatchItemsSent() {
-        return batchItemsSent;
-    }
-
-    public void setFlushThresholdSizeInBytes(long sizeInBytes) {
-        this.flushThresholdSizeInBytes = sizeInBytes;
-    }
-
-    Thread task;
-
-    protected volatile Queue<StreamsDatum> persistQueue;
-
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-    private ElasticsearchConfiguration config;
+    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistUpdater.class);
 
     public ElasticsearchPersistUpdater() {
-        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
-        this.config = ElasticsearchConfigurator.detectConfiguration(config);
+        super();
     }
 
-    public ElasticsearchPersistUpdater(ElasticsearchConfiguration config) {
-        this.config = config;
-    }
-
-    private static final int BYTES_IN_MB = 1024 * 1024;
-    private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
-    private volatile int totalByteCount = 0;
-    private volatile int byteCount = 0;
-
-    public boolean isConnected() {
-        return (client != null);
+    public ElasticsearchPersistUpdater(ElasticsearchWriterConfiguration config) {
+        super(config);
     }
 
     @Override
@@ -156,15 +76,23 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
         Preconditions.checkNotNull(streamsDatum.getMetadata());
         Preconditions.checkNotNull(streamsDatum.getMetadata().get("id"));
 
+        String index;
+        String type;
         String id;
         String json;
         try {
 
-            json = mapper.writeValueAsString(streamsDatum.getDocument());
+            json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
 
+            index = Optional.fromNullable(
+                    (String) streamsDatum.getMetadata().get("index"))
+                    .or(config.getIndex());
+            type = Optional.fromNullable(
+                    (String) streamsDatum.getMetadata().get("type"))
+                    .or(config.getType());
             id = (String) streamsDatum.getMetadata().get("id");
 
-            add(index, type, id, json);
+            update(index, type, id, json);
 
         } catch (JsonProcessingException e) {
             LOGGER.warn("{} {}", e.getLocation(), e.getMessage());
@@ -172,184 +100,7 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
         }
     }
 
-    public void start() {
-
-        manager = new ElasticsearchClientManager(config);
-        client = manager.getClient();
-
-        LOGGER.info(client.toString());
-    }
-
-    public void cleanUp() {
-
-        try {
-            flush();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        close();
-    }
-
-    @Override
-    public void close() {
-        try {
-            // before they close, check to ensure that
-            this.flush();
-
-            int count = 0;
-            // We are going to give it 5 minutes.
-            while (this.getTotalOutstanding() > 0 && count++ < 20 * 60 * 5)
-                Thread.sleep(50);
-
-            if (this.getTotalOutstanding() > 0) {
-                LOGGER.error("We never cleared our buffer");
-            }
-
-
-            for (String indexName : this.getAffectedIndexes()) {
-                createIndexIfMissing(indexName);
-
-                if (this.veryLargeBulk) {
-                    LOGGER.debug("Resetting our Refresh Interval: {}", indexName);
-                    // They are in 'very large bulk' mode and the process is finished. We now want to turn the
-                    // refreshing back on.
-                    UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
-                    updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", "5s"));
-
-                    // submit to ElasticSearch
-                    this.manager.getClient()
-                            .admin()
-                            .indices()
-                            .updateSettings(updateSettingsRequest)
-                            .actionGet();
-                }
-
-                checkIndexImplications(indexName);
-
-                LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
-                this.manager.getClient()
-                        .admin()
-                        .indices()
-                        .prepareRefresh(indexName)
-                        .execute()
-                        .actionGet();
-            }
-
-            LOGGER.info("Closed: Wrote[{} of {}] Failed[{}]", this.getTotalOk(), this.getTotalSent(), this.getTotalFailed());
-
-        } catch (Exception e) {
-            // this line of code should be logically unreachable.
-            LOGGER.warn("This is unexpected: {}", e.getMessage());
-            e.printStackTrace();
-        }
-    }
-
-    @Override
-    public void flush() throws IOException {
-        flushInternal();
-    }
-
-    public void flushInternal() {
-        synchronized (this) {
-            // we do not have a working bulk request, we can just exit here.
-            if (this.bulkRequest == null || batchItemsSent == 0)
-                return;
-
-            // call the flush command.
-            flush(this.bulkRequest, batchItemsSent, batchSizeInBytes);
-
-            // null the flush request, this will be created in the 'add' function below
-            this.bulkRequest = null;
-
-            // record the proper statistics, and add it to our totals.
-            this.totalSizeInBytes += this.batchSizeInBytes;
-            this.totalSent += batchItemsSent;
-
-            // reset the current batch statistics
-            this.batchSizeInBytes = 0;
-            this.batchItemsSent = 0;
-
-            try {
-                int count = 0;
-                if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) {
-                    /****************************************************************************
-                     * Author:
-                     * Smashew
-                     *
-                     * Date:
-                     * 2013-10-20
-                     *
-                     * Note:
-                     * With the information that we have on hand. We need to develop a heuristic
-                     * that will determine when the cluster is having a problem indexing records
-                     * by telling it to pause and wait for it to catch back up. A
-                     *
-                     * There is an impact to us, the caller, whenever this happens as well. Items
-                     * that are not yet fully indexed by the server sit in a queue, on the client
-                     * that can cause the heap to overflow. This has been seen when re-indexing
-                     * large amounts of data to a small cluster. The "deletes" + "indexes" can
-                     * cause the server to have many 'outstandingItems" in queue. Running this
-                     * software with large amounts of data, on a small cluster, can re-create
-                     * this problem.
-                     *
-                     * DO NOT DELETE THESE LINES
-                     ****************************************************************************/
-
-                    // wait for the flush to catch up. We are going to cap this at
-                    while (this.getTotalOutstanding() > WAITING_DOCS_LIMIT && count++ < 500)
-                        Thread.sleep(10);
-
-                    if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT)
-                        LOGGER.warn("Even after back-off there are {} items still in queue.", this.getTotalOutstanding());
-                }
-            } catch (Exception e) {
-                LOGGER.info("We were broken from our loop: {}", e.getMessage());
-            }
-        }
-    }
-
-    private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final Long thisSizeInBytes) {
-        bulkRequest.execute().addListener(new ActionListener<BulkResponse>() {
-            @Override
-            public void onResponse(BulkResponse bulkItemResponses) {
-                if (bulkItemResponses.hasFailures())
-                    LOGGER.warn("Bulk Uploading had totalFailed: " + bulkItemResponses.buildFailureMessage());
-
-                long thisFailed = 0;
-                long thisOk = 0;
-                long thisMillis = bulkItemResponses.getTookInMillis();
-
-                // keep track of the number of totalFailed and items that we have totalOk.
-                for (BulkItemResponse resp : bulkItemResponses.getItems()) {
-                    if (resp.isFailed())
-                        thisFailed++;
-                    else
-                        thisOk++;
-                }
-
-                totalOk += thisOk;
-                totalFailed += thisFailed;
-                totalSeconds += (thisMillis / 1000);
-
-                if (thisSent != (thisOk + thisFailed))
-                    LOGGER.error("We sent more items than this");
-
-                LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]",
-                        MEGABYTE_FORMAT.format((double) thisSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(thisOk), NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis),
-                        MEGABYTE_FORMAT.format((double) totalSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding()));
-            }
-
-            @Override
-            public void onFailure(Throwable e) {
-                LOGGER.error("Error bulk loading: {}", e.getMessage());
-                e.printStackTrace();
-            }
-        });
-
-        this.notify();
-    }
-
-    public void add(String indexName, String type, String id, String json) {
+    public void update(String indexName, String type, String id, String json) {
         UpdateRequest updateRequest;
 
         Preconditions.checkNotNull(id);
@@ -366,137 +117,23 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
 
     }
 
-    public void add(UpdateRequest updateRequest) {
-        Preconditions.checkNotNull(updateRequest);
-        synchronized (this) {
-            checkAndCreateBulkRequest();
-            checkIndexImplications(updateRequest.index());
-            bulkRequest.add(updateRequest);
-            try {
-                Optional<Integer> size = Objects.firstNonNull(
-                        Optional.fromNullable(updateRequest.doc().source().length()),
-                        Optional.fromNullable(updateRequest.script().length()));
-                trackItemAndBytesWritten(size.get().longValue());
-            } catch (NullPointerException x) {
-                trackItemAndBytesWritten(1000);
-            }
-        }
-    }
+    public void add(UpdateRequest request) {
 
-    private void trackItemAndBytesWritten(long sizeInBytes) {
-        batchItemsSent++;
-        batchSizeInBytes += sizeInBytes;
+        Preconditions.checkNotNull(request);
+        Preconditions.checkNotNull(request.index());
 
-        // If our queue is larger than our flush threashold, then we should flush the queue.
-        if (batchSizeInBytes > flushThresholdSizeInBytes)
-            flushInternal();
-    }
-
-    private void checkAndCreateBulkRequest() {
-        // Synchronize to ensure that we don't lose any records
+        // If our queue is larger than our flush threshold, then we should flush the queue.
         synchronized (this) {
-            if (bulkRequest == null)
-                bulkRequest = this.manager.getClient().prepareBulk();
-        }
-    }
-
-    private void checkIndexImplications(String indexName) {
-
-        // check to see if we have seen this index before.
-        if (this.affectedIndexes.contains(indexName))
-            return;
-
-        // we haven't log this index.
-        this.affectedIndexes.add(indexName);
-
-        // Check to see if we are in 'veryLargeBulk' mode
-        // if we aren't, exit early
-        if (!this.veryLargeBulk)
-            return;
-
-
-        // They are in 'very large bulk' mode we want to turn off refreshing the index.
-
-        // Create a request then add the setting to tell it to stop refreshing the interval
-        UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
-        updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1));
-
-        // submit to ElasticSearch
-        this.manager.getClient()
-                .admin()
-                .indices()
-                .updateSettings(updateSettingsRequest)
-                .actionGet();
-    }
-
-    public void createIndexIfMissing(String indexName) {
-        if (!this.manager.getClient()
-                .admin()
-                .indices()
-                .exists(new IndicesExistsRequest(indexName))
-                .actionGet()
-                .isExists()) {
-            // It does not exist... So we are going to need to create the index.
-            // we are going to assume that the 'templates' that we have loaded into
-            // elasticsearch are sufficient to ensure the index is being created properly.
-            CreateIndexResponse response = this.manager.getClient().admin().indices().create(new CreateIndexRequest(indexName)).actionGet();
-
-            if (response.isAcknowledged()) {
-                LOGGER.info("Index {} did not exist. The index was automatically created from the stored ElasticSearch Templates.", indexName);
-            } else {
-                LOGGER.error("Index {} did not exist. While attempting to create the index from stored ElasticSearch Templates we were unable to get an acknowledgement.", indexName);
-                LOGGER.error("Error Message: {}", response.toString());
-                throw new RuntimeException("Unable to create index " + indexName);
-            }
-        }
-    }
+            checkIndexImplications(request.index());
 
-    public void add(String indexName, String type, Map<String, Object> toImport) {
-        for (String id : toImport.keySet())
-            add(indexName, type, id, (String) toImport.get(id));
-    }
-
-    private void checkThenAddBatch(String index, String type, Map<String, String> workingBatch) {
-        Set<String> invalidIDs = checkIds(workingBatch.keySet(), index, type);
-
-        for (String toAddId : workingBatch.keySet())
-            if (!invalidIDs.contains(toAddId))
-                add(index, type, toAddId, workingBatch.get(toAddId));
-
-        LOGGER.info("Adding Batch: {} -> {}", workingBatch.size(), invalidIDs.size());
-    }
-
-
-    private Set<String> checkIds(Set<String> input, String index, String type) {
+            bulkRequest.add(request);
 
-        IdsQueryBuilder idsFilterBuilder = new IdsQueryBuilder();
+            currentBatchBytes.addAndGet(request.doc().source().length());
+            currentBatchItems.incrementAndGet();
 
-        for (String s : input)
-            idsFilterBuilder.addIds(s);
-
-        SearchRequestBuilder searchRequestBuilder = this.manager.getClient()
-                .prepareSearch(index)
-                .setTypes(type)
-                .setQuery(idsFilterBuilder)
-                .addField("_id")
-                .setSize(input.size());
-
-        SearchHits hits = searchRequestBuilder.execute()
-                .actionGet()
-                .getHits();
-
-        Set<String> toReturn = new HashSet<String>();
-
-        for (SearchHit hit : hits) {
-            toReturn.add(hit.getId());
+            checkForFlush();
         }
 
-        return toReturn;
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        start();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0be789cf/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 05b0ef2..b83918d 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.TreeNode;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.*;
@@ -66,14 +67,14 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
     //A document should have to wait no more than 10s to get flushed
     private static final long DEFAULT_MAX_WAIT = 10000;
 
-    private static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
+    protected static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
 
-    private final List<String> affectedIndexes = new ArrayList<String>();
+    protected final List<String> affectedIndexes = new ArrayList<String>();
 
-    private final ElasticsearchClientManager manager;
-    private final ElasticsearchWriterConfiguration config;
+    protected final ElasticsearchClientManager manager;
+    protected final ElasticsearchWriterConfiguration config;
 
-    private BulkRequestBuilder bulkRequest;
+    protected BulkRequestBuilder bulkRequest;
 
     private boolean veryLargeBulk = false;  // by default this setting is set to false
     private long flushThresholdsRecords = DEFAULT_BATCH_SIZE;
@@ -87,8 +88,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
     private final AtomicInteger batchesSent = new AtomicInteger(0);
     private final AtomicInteger batchesResponded = new AtomicInteger(0);
 
-    private final AtomicLong currentBatchItems = new AtomicLong(0);
-    private final AtomicLong currentBatchBytes = new AtomicLong(0);
+    protected final AtomicLong currentBatchItems = new AtomicLong(0);
+    protected final AtomicLong currentBatchBytes = new AtomicLong(0);
 
     private final AtomicLong totalSent = new AtomicLong(0);
     private final AtomicLong totalSeconds = new AtomicLong(0);
@@ -142,8 +143,16 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
 
         checkForBackOff();
 
+        String index = Optional.fromNullable(
+                (String) streamsDatum.getMetadata().get("index"))
+                .or(config.getIndex());
+        String type = Optional.fromNullable(
+                (String) streamsDatum.getMetadata().get("type"))
+                .or(config.getType());
+        String id = (String) streamsDatum.getMetadata().get("id");
+
         try {
-            add(config.getIndex(), config.getType(), streamsDatum.getId(),
+            add(index, type, id,
                     streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis()) : Long.toString(streamsDatum.getTimestamp().getMillis()),
                     convertAndAppendMetadata(streamsDatum));
         } catch (Throwable e) {
@@ -333,7 +342,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
         }
     }
 
-    private void checkForFlush() {
+    protected void checkForFlush() {
         synchronized (this) {
             if (this.currentBatchBytes.get() >= this.flushThresholdBytes ||
                     this.currentBatchItems.get() >= this.flushThresholdsRecords ||
@@ -344,7 +353,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
         }
     }
 
-    private void checkIndexImplications(String indexName) {
+    protected void checkIndexImplications(String indexName) {
         // We need this to be safe across all writers that are currently being executed
         synchronized (ElasticsearchPersistWriter.class) {
 


[2/2] git commit: added timestamp logic removed code and logic deprecated by STREAMS-116

Posted by sb...@apache.org.
added timestamp logic
removed code and logic deprecated by STREAMS-116


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0103a967
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0103a967
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0103a967

Branch: refs/heads/STREAMS-112
Commit: 0103a967b0f65ead42215515b323771af7b66421
Parents: c2d59a2
Author: sblackmon <sb...@w2odigital.com>
Authored: Mon Aug 4 14:02:41 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon Aug 4 14:02:41 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchPersistReader.java             | 18 ++--
 .../elasticsearch/ElasticsearchQuery.java       | 95 ++------------------
 2 files changed, 16 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0103a967/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index 7ba9b33..5411094 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -194,15 +194,19 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Seriali
                 ObjectNode jsonObject = null;
                 try {
                     jsonObject = mapper.readValue(hit.getSourceAsString(), ObjectNode.class);
+                    item = new StreamsDatum(jsonObject, hit.getId());
+                    item.getMetadata().put("id", hit.getId());
+                    item.getMetadata().put("index", hit.getIndex());
+                    item.getMetadata().put("type", hit.getType());
+                    if( hit.fields().containsKey("_timestamp")) {
+                        DateTime timestamp = new DateTime(((Long) hit.field("_timestamp").getValue()).longValue());
+                        item.setTimestamp(timestamp);
+                    }
+                    reader.write(item);
                 } catch (IOException e) {
-                    e.printStackTrace();
-                    break;
+                    LOGGER.warn("Unable to process json source: ", hit.getSourceAsString());
                 }
-                item = new StreamsDatum(jsonObject, hit.getId());
-                item.getMetadata().put("id", hit.getId());
-                item.getMetadata().put("index", hit.getIndex());
-                item.getMetadata().put("type", hit.getType());
-                reader.write(item);
+
             }
             try {
                 Thread.sleep(new Random().nextInt(100));

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0103a967/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
index defd9dc..7699bd4 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -117,14 +117,6 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
         this.filterBuilder = filterBuilder;
     }
 
-    public void setWithfields(String[] withfields) {
-        this.withfields = withfields;
-    }
-
-    public void setWithoutfields(String[] withoutfields) {
-        this.withoutfields = withoutfields;
-    }
-
     public void execute(Object o) {
 
         // If we haven't already set up the search, then set up the search.
@@ -133,8 +125,12 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
             search = elasticsearchClientManager.getClient()
                     .prepareSearch(indexes.toArray(new String[0]))
                     .setSearchType(SearchType.SCAN)
+                    .setExplain(true)
+                    .addField("*")
+                    .setFetchSource(true)
                     .setSize(Objects.firstNonNull(batchSize, DEFAULT_BATCH_SIZE).intValue())
-                    .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT));
+                    .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT))
+                    .addField("_timestamp");
 
             String searchJson;
             if( config.getSearch() != null ) {
@@ -161,23 +157,6 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
             if (this.types != null && this.types.size() > 0)
                 search = search.setTypes(types.toArray(new String[0]));
 
-            Integer clauses = 0;
-            if (this.withfields != null || this.withoutfields != null) {
-                if (this.withfields != null)
-                    clauses += this.withfields.length;
-                if (this.withoutfields != null)
-                    clauses += this.withoutfields.length;
-            }
-
-            List<FilterBuilder> filterList = buildFilterList();
-
-            FilterBuilder allFilters = andFilters(filterList);
-
-            if (clauses > 0) {
-                //    search.setPostFilter(allFilters);
-                search = search.setPostFilter(allFilters);
-            }
-
             // TODO: Replace when all clusters are upgraded past 0.90.4 so we can implement a RANDOM scroll.
             if (this.random)
                 search = search.addSort(SortBuilders.scriptSort("random()", "number"));
@@ -258,68 +237,4 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
         return scrollPositionInScroll != -1 && (!(this.totalRead > this.limit));
     }
 
-    // copied from elasticsearch
-    // if we need this again we should factor it out into a utility
-    private FilterBuilder andFilters(List<FilterBuilder> filters) {
-        if (filters == null || filters.size() == 0)
-            return null;
-
-        FilterBuilder toReturn = filters.get(0);
-
-        for (int i = 1; i < filters.size(); i++)
-            toReturn = FilterBuilders.andFilter(toReturn, filters.get(i));
-
-        return toReturn;
-    }
-
-    private FilterBuilder orFilters(List<FilterBuilder> filters) {
-        if (filters == null || filters.size() == 0)
-            return null;
-
-        FilterBuilder toReturn = filters.get(0);
-
-        for (int i = 1; i < filters.size(); i++)
-            toReturn = FilterBuilders.orFilter(toReturn, filters.get(i));
-
-        return toReturn;
-    }
-
-    private List<FilterBuilder> buildFilterList() {
-
-        // If any withfields are specified, require that field be present
-        //    There must a value set also for the document to be processed
-        // If any withoutfields are specified, require that field not be present
-        //    Document will be picked up even if present, if they do not have at least one value
-        // this is annoying as it majorly impacts runtime
-        // might be able to change behavior using null_field
-
-        ArrayList<FilterBuilder> filterList = Lists.newArrayList();
-
-        // If any withfields are specified, require that field be present
-        //    There must a value set also for the document to be processed
-        if (this.withfields != null && this.withfields.length > 0) {
-            ArrayList<FilterBuilder> withFilterList = Lists.newArrayList();
-            for (String withfield : this.withfields) {
-                FilterBuilder withFilter = FilterBuilders.existsFilter(withfield);
-                withFilterList.add(withFilter);
-            }
-            //filterList.add(FilterBuilders.orFilter(orFilters(withFilterList)));
-            filterList.add(withFilterList.get(0));
-        }
-        // If any withoutfields are specified, require that field not be present
-        //    Document will be picked up even if present, if they do not have at least one value
-        // this is annoying as it majorly impacts runtime
-        // might be able to change behavior using null_field
-        if (this.withoutfields != null && this.withoutfields.length > 0) {
-            ArrayList<FilterBuilder> withoutFilterList = Lists.newArrayList();
-            for (String withoutfield : this.withoutfields) {
-                FilterBuilder withoutFilter = FilterBuilders.missingFilter(withoutfield).existence(true).nullValue(false);
-                withoutFilterList.add(withoutFilter);
-            }
-            //filterList.add(FilterBuilders.orFilter(orFilters(withoutFilterList)));
-            filterList.add(withoutFilterList.get(0));
-        }
-
-        return filterList;
-    }
 }