You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/08/04 21:03:51 UTC
[1/2] git commit: updater / deleter implementation
Repository: incubator-streams
Updated Branches:
refs/heads/STREAMS-112 [created] 0103a967b
refs/heads/STREAMS-140 [created] 0be789cfd
updater / deleter implementation
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0be789cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0be789cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0be789cf
Branch: refs/heads/STREAMS-140
Commit: 0be789cfdb23652f9f07e394b885598887f91ee7
Parents: c2d59a2
Author: sblackmon <sb...@w2odigital.com>
Authored: Mon Aug 4 13:30:36 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon Aug 4 13:30:36 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchPersistDeleter.java | 102 +++++
.../ElasticsearchPersistUpdater.java | 413 ++-----------------
.../ElasticsearchPersistWriter.java | 29 +-
3 files changed, 146 insertions(+), 398 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0be789cf/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
new file mode 100644
index 0000000..fece72e
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.elasticsearch;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter implements StreamsPersistWriter {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistDeleter.class);
+
+ public ElasticsearchPersistDeleter() {
+ super();
+ }
+
+ public ElasticsearchPersistDeleter(ElasticsearchWriterConfiguration config) {
+ super(config);
+ }
+
+ @Override
+ public void write(StreamsDatum streamsDatum) {
+
+ Preconditions.checkNotNull(streamsDatum);
+ Preconditions.checkNotNull(streamsDatum.getDocument());
+ Preconditions.checkNotNull(streamsDatum.getMetadata());
+ Preconditions.checkNotNull(streamsDatum.getMetadata().get("id"));
+
+ String index;
+ String type;
+ String id;
+
+ index = Optional.fromNullable(
+ (String) streamsDatum.getMetadata().get("index"))
+ .or(config.getIndex());
+ type = Optional.fromNullable(
+ (String) streamsDatum.getMetadata().get("type"))
+ .or(config.getType());
+ id = (String) streamsDatum.getMetadata().get("id");
+
+ delete(index, type, id);
+
+ }
+
+ public void delete(String index, String type, String id) {
+ DeleteRequest deleteRequest;
+
+ Preconditions.checkNotNull(index);
+ Preconditions.checkNotNull(id);
+ Preconditions.checkNotNull(type);
+
+ // They didn't specify an ID, so we will create one for them.
+ deleteRequest = new DeleteRequest()
+ .index(index)
+ .type(type)
+ .id(id);
+
+ add(deleteRequest);
+
+ }
+
+ public void add(DeleteRequest request) {
+
+ Preconditions.checkNotNull(request);
+ Preconditions.checkNotNull(request.index());
+
+ // If our queue is larger than our flush threshold, then we should flush the queue.
+ synchronized (this) {
+ checkIndexImplications(request.index());
+
+ bulkRequest.add(request);
+
+ currentBatchItems.incrementAndGet();
+
+ checkForFlush();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0be789cf/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index b2e7556..dbf7d25 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -56,96 +56,16 @@ import java.util.*;
//import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
-public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable {
- private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistUpdater.class);
- private final static NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
- private final static NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
-
- protected ElasticsearchClientManager manager;
- protected Client client;
- private String parentID = null;
- protected BulkRequestBuilder bulkRequest;
- private OutputStreamWriter currentWriter = null;
-
- protected String index = null;
- protected String type = null;
- private int batchSize = 50;
- private int totalRecordsWritten = 0;
- private boolean veryLargeBulk = false; // by default this setting is set to false
-
- private final static Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
- private static final long WAITING_DOCS_LIMIT = 10000;
-
- public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
-
- private volatile int totalSent = 0;
- private volatile int totalSeconds = 0;
- private volatile int totalOk = 0;
- private volatile int totalFailed = 0;
- private volatile int totalBatchCount = 0;
- private volatile long totalSizeInBytes = 0;
-
- private volatile long batchSizeInBytes = 0;
- private volatile int batchItemsSent = 0;
-
- public void setIndex(String index) {
- this.index = index;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public void setBatchSize(int batchSize) {
- this.batchSize = batchSize;
- }
-
- private final List<String> affectedIndexes = new ArrayList<String>();
+public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter implements StreamsPersistWriter {
- public long getFlushThresholdSizeInBytes() {
- return flushThresholdSizeInBytes;
- }
-
- public int getTotalBatchCount() {
- return totalBatchCount;
- }
-
- public long getBatchSizeInBytes() {
- return batchSizeInBytes;
- }
-
- public int getBatchItemsSent() {
- return batchItemsSent;
- }
-
- public void setFlushThresholdSizeInBytes(long sizeInBytes) {
- this.flushThresholdSizeInBytes = sizeInBytes;
- }
-
- Thread task;
-
- protected volatile Queue<StreamsDatum> persistQueue;
-
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
- private ElasticsearchConfiguration config;
+ private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistUpdater.class);
public ElasticsearchPersistUpdater() {
- Config config = StreamsConfigurator.config.getConfig("elasticsearch");
- this.config = ElasticsearchConfigurator.detectConfiguration(config);
+ super();
}
- public ElasticsearchPersistUpdater(ElasticsearchConfiguration config) {
- this.config = config;
- }
-
- private static final int BYTES_IN_MB = 1024 * 1024;
- private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
- private volatile int totalByteCount = 0;
- private volatile int byteCount = 0;
-
- public boolean isConnected() {
- return (client != null);
+ public ElasticsearchPersistUpdater(ElasticsearchWriterConfiguration config) {
+ super(config);
}
@Override
@@ -156,15 +76,23 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
Preconditions.checkNotNull(streamsDatum.getMetadata());
Preconditions.checkNotNull(streamsDatum.getMetadata().get("id"));
+ String index;
+ String type;
String id;
String json;
try {
- json = mapper.writeValueAsString(streamsDatum.getDocument());
+ json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
+ index = Optional.fromNullable(
+ (String) streamsDatum.getMetadata().get("index"))
+ .or(config.getIndex());
+ type = Optional.fromNullable(
+ (String) streamsDatum.getMetadata().get("type"))
+ .or(config.getType());
id = (String) streamsDatum.getMetadata().get("id");
- add(index, type, id, json);
+ update(index, type, id, json);
} catch (JsonProcessingException e) {
LOGGER.warn("{} {}", e.getLocation(), e.getMessage());
@@ -172,184 +100,7 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
}
}
- public void start() {
-
- manager = new ElasticsearchClientManager(config);
- client = manager.getClient();
-
- LOGGER.info(client.toString());
- }
-
- public void cleanUp() {
-
- try {
- flush();
- } catch (IOException e) {
- e.printStackTrace();
- }
- close();
- }
-
- @Override
- public void close() {
- try {
- // before they close, check to ensure that
- this.flush();
-
- int count = 0;
- // We are going to give it 5 minutes.
- while (this.getTotalOutstanding() > 0 && count++ < 20 * 60 * 5)
- Thread.sleep(50);
-
- if (this.getTotalOutstanding() > 0) {
- LOGGER.error("We never cleared our buffer");
- }
-
-
- for (String indexName : this.getAffectedIndexes()) {
- createIndexIfMissing(indexName);
-
- if (this.veryLargeBulk) {
- LOGGER.debug("Resetting our Refresh Interval: {}", indexName);
- // They are in 'very large bulk' mode and the process is finished. We now want to turn the
- // refreshing back on.
- UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
- updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", "5s"));
-
- // submit to ElasticSearch
- this.manager.getClient()
- .admin()
- .indices()
- .updateSettings(updateSettingsRequest)
- .actionGet();
- }
-
- checkIndexImplications(indexName);
-
- LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
- this.manager.getClient()
- .admin()
- .indices()
- .prepareRefresh(indexName)
- .execute()
- .actionGet();
- }
-
- LOGGER.info("Closed: Wrote[{} of {}] Failed[{}]", this.getTotalOk(), this.getTotalSent(), this.getTotalFailed());
-
- } catch (Exception e) {
- // this line of code should be logically unreachable.
- LOGGER.warn("This is unexpected: {}", e.getMessage());
- e.printStackTrace();
- }
- }
-
- @Override
- public void flush() throws IOException {
- flushInternal();
- }
-
- public void flushInternal() {
- synchronized (this) {
- // we do not have a working bulk request, we can just exit here.
- if (this.bulkRequest == null || batchItemsSent == 0)
- return;
-
- // call the flush command.
- flush(this.bulkRequest, batchItemsSent, batchSizeInBytes);
-
- // null the flush request, this will be created in the 'add' function below
- this.bulkRequest = null;
-
- // record the proper statistics, and add it to our totals.
- this.totalSizeInBytes += this.batchSizeInBytes;
- this.totalSent += batchItemsSent;
-
- // reset the current batch statistics
- this.batchSizeInBytes = 0;
- this.batchItemsSent = 0;
-
- try {
- int count = 0;
- if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) {
- /****************************************************************************
- * Author:
- * Smashew
- *
- * Date:
- * 2013-10-20
- *
- * Note:
- * With the information that we have on hand. We need to develop a heuristic
- * that will determine when the cluster is having a problem indexing records
- * by telling it to pause and wait for it to catch back up. A
- *
- * There is an impact to us, the caller, whenever this happens as well. Items
- * that are not yet fully indexed by the server sit in a queue, on the client
- * that can cause the heap to overflow. This has been seen when re-indexing
- * large amounts of data to a small cluster. The "deletes" + "indexes" can
- * cause the server to have many 'outstandingItems" in queue. Running this
- * software with large amounts of data, on a small cluster, can re-create
- * this problem.
- *
- * DO NOT DELETE THESE LINES
- ****************************************************************************/
-
- // wait for the flush to catch up. We are going to cap this at
- while (this.getTotalOutstanding() > WAITING_DOCS_LIMIT && count++ < 500)
- Thread.sleep(10);
-
- if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT)
- LOGGER.warn("Even after back-off there are {} items still in queue.", this.getTotalOutstanding());
- }
- } catch (Exception e) {
- LOGGER.info("We were broken from our loop: {}", e.getMessage());
- }
- }
- }
-
- private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final Long thisSizeInBytes) {
- bulkRequest.execute().addListener(new ActionListener<BulkResponse>() {
- @Override
- public void onResponse(BulkResponse bulkItemResponses) {
- if (bulkItemResponses.hasFailures())
- LOGGER.warn("Bulk Uploading had totalFailed: " + bulkItemResponses.buildFailureMessage());
-
- long thisFailed = 0;
- long thisOk = 0;
- long thisMillis = bulkItemResponses.getTookInMillis();
-
- // keep track of the number of totalFailed and items that we have totalOk.
- for (BulkItemResponse resp : bulkItemResponses.getItems()) {
- if (resp.isFailed())
- thisFailed++;
- else
- thisOk++;
- }
-
- totalOk += thisOk;
- totalFailed += thisFailed;
- totalSeconds += (thisMillis / 1000);
-
- if (thisSent != (thisOk + thisFailed))
- LOGGER.error("We sent more items than this");
-
- LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]",
- MEGABYTE_FORMAT.format((double) thisSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(thisOk), NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis),
- MEGABYTE_FORMAT.format((double) totalSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding()));
- }
-
- @Override
- public void onFailure(Throwable e) {
- LOGGER.error("Error bulk loading: {}", e.getMessage());
- e.printStackTrace();
- }
- });
-
- this.notify();
- }
-
- public void add(String indexName, String type, String id, String json) {
+ public void update(String indexName, String type, String id, String json) {
UpdateRequest updateRequest;
Preconditions.checkNotNull(id);
@@ -366,137 +117,23 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
}
- public void add(UpdateRequest updateRequest) {
- Preconditions.checkNotNull(updateRequest);
- synchronized (this) {
- checkAndCreateBulkRequest();
- checkIndexImplications(updateRequest.index());
- bulkRequest.add(updateRequest);
- try {
- Optional<Integer> size = Objects.firstNonNull(
- Optional.fromNullable(updateRequest.doc().source().length()),
- Optional.fromNullable(updateRequest.script().length()));
- trackItemAndBytesWritten(size.get().longValue());
- } catch (NullPointerException x) {
- trackItemAndBytesWritten(1000);
- }
- }
- }
+ public void add(UpdateRequest request) {
- private void trackItemAndBytesWritten(long sizeInBytes) {
- batchItemsSent++;
- batchSizeInBytes += sizeInBytes;
+ Preconditions.checkNotNull(request);
+ Preconditions.checkNotNull(request.index());
- // If our queue is larger than our flush threashold, then we should flush the queue.
- if (batchSizeInBytes > flushThresholdSizeInBytes)
- flushInternal();
- }
-
- private void checkAndCreateBulkRequest() {
- // Synchronize to ensure that we don't lose any records
+ // If our queue is larger than our flush threshold, then we should flush the queue.
synchronized (this) {
- if (bulkRequest == null)
- bulkRequest = this.manager.getClient().prepareBulk();
- }
- }
-
- private void checkIndexImplications(String indexName) {
-
- // check to see if we have seen this index before.
- if (this.affectedIndexes.contains(indexName))
- return;
-
- // we haven't log this index.
- this.affectedIndexes.add(indexName);
-
- // Check to see if we are in 'veryLargeBulk' mode
- // if we aren't, exit early
- if (!this.veryLargeBulk)
- return;
-
-
- // They are in 'very large bulk' mode we want to turn off refreshing the index.
-
- // Create a request then add the setting to tell it to stop refreshing the interval
- UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
- updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1));
-
- // submit to ElasticSearch
- this.manager.getClient()
- .admin()
- .indices()
- .updateSettings(updateSettingsRequest)
- .actionGet();
- }
-
- public void createIndexIfMissing(String indexName) {
- if (!this.manager.getClient()
- .admin()
- .indices()
- .exists(new IndicesExistsRequest(indexName))
- .actionGet()
- .isExists()) {
- // It does not exist... So we are going to need to create the index.
- // we are going to assume that the 'templates' that we have loaded into
- // elasticsearch are sufficient to ensure the index is being created properly.
- CreateIndexResponse response = this.manager.getClient().admin().indices().create(new CreateIndexRequest(indexName)).actionGet();
-
- if (response.isAcknowledged()) {
- LOGGER.info("Index {} did not exist. The index was automatically created from the stored ElasticSearch Templates.", indexName);
- } else {
- LOGGER.error("Index {} did not exist. While attempting to create the index from stored ElasticSearch Templates we were unable to get an acknowledgement.", indexName);
- LOGGER.error("Error Message: {}", response.toString());
- throw new RuntimeException("Unable to create index " + indexName);
- }
- }
- }
+ checkIndexImplications(request.index());
- public void add(String indexName, String type, Map<String, Object> toImport) {
- for (String id : toImport.keySet())
- add(indexName, type, id, (String) toImport.get(id));
- }
-
- private void checkThenAddBatch(String index, String type, Map<String, String> workingBatch) {
- Set<String> invalidIDs = checkIds(workingBatch.keySet(), index, type);
-
- for (String toAddId : workingBatch.keySet())
- if (!invalidIDs.contains(toAddId))
- add(index, type, toAddId, workingBatch.get(toAddId));
-
- LOGGER.info("Adding Batch: {} -> {}", workingBatch.size(), invalidIDs.size());
- }
-
-
- private Set<String> checkIds(Set<String> input, String index, String type) {
+ bulkRequest.add(request);
- IdsQueryBuilder idsFilterBuilder = new IdsQueryBuilder();
+ currentBatchBytes.addAndGet(request.doc().source().length());
+ currentBatchItems.incrementAndGet();
- for (String s : input)
- idsFilterBuilder.addIds(s);
-
- SearchRequestBuilder searchRequestBuilder = this.manager.getClient()
- .prepareSearch(index)
- .setTypes(type)
- .setQuery(idsFilterBuilder)
- .addField("_id")
- .setSize(input.size());
-
- SearchHits hits = searchRequestBuilder.execute()
- .actionGet()
- .getHits();
-
- Set<String> toReturn = new HashSet<String>();
-
- for (SearchHit hit : hits) {
- toReturn.add(hit.getId());
+ checkForFlush();
}
- return toReturn;
- }
-
- @Override
- public void prepare(Object configurationObject) {
- start();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0be789cf/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 05b0ef2..b83918d 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.*;
@@ -66,14 +67,14 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
//A document should have to wait no more than 10s to get flushed
private static final long DEFAULT_MAX_WAIT = 10000;
- private static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
+ protected static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
- private final List<String> affectedIndexes = new ArrayList<String>();
+ protected final List<String> affectedIndexes = new ArrayList<String>();
- private final ElasticsearchClientManager manager;
- private final ElasticsearchWriterConfiguration config;
+ protected final ElasticsearchClientManager manager;
+ protected final ElasticsearchWriterConfiguration config;
- private BulkRequestBuilder bulkRequest;
+ protected BulkRequestBuilder bulkRequest;
private boolean veryLargeBulk = false; // by default this setting is set to false
private long flushThresholdsRecords = DEFAULT_BATCH_SIZE;
@@ -87,8 +88,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
private final AtomicInteger batchesSent = new AtomicInteger(0);
private final AtomicInteger batchesResponded = new AtomicInteger(0);
- private final AtomicLong currentBatchItems = new AtomicLong(0);
- private final AtomicLong currentBatchBytes = new AtomicLong(0);
+ protected final AtomicLong currentBatchItems = new AtomicLong(0);
+ protected final AtomicLong currentBatchBytes = new AtomicLong(0);
private final AtomicLong totalSent = new AtomicLong(0);
private final AtomicLong totalSeconds = new AtomicLong(0);
@@ -142,8 +143,16 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
checkForBackOff();
+ String index = Optional.fromNullable(
+ (String) streamsDatum.getMetadata().get("index"))
+ .or(config.getIndex());
+ String type = Optional.fromNullable(
+ (String) streamsDatum.getMetadata().get("type"))
+ .or(config.getType());
+ String id = (String) streamsDatum.getMetadata().get("id");
+
try {
- add(config.getIndex(), config.getType(), streamsDatum.getId(),
+ add(index, type, id,
streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis()) : Long.toString(streamsDatum.getTimestamp().getMillis()),
convertAndAppendMetadata(streamsDatum));
} catch (Throwable e) {
@@ -333,7 +342,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
}
}
- private void checkForFlush() {
+ protected void checkForFlush() {
synchronized (this) {
if (this.currentBatchBytes.get() >= this.flushThresholdBytes ||
this.currentBatchItems.get() >= this.flushThresholdsRecords ||
@@ -344,7 +353,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
}
}
- private void checkIndexImplications(String indexName) {
+ protected void checkIndexImplications(String indexName) {
// We need this to be safe across all writers that are currently being executed
synchronized (ElasticsearchPersistWriter.class) {
[2/2] git commit: added timestamp logic removed code and logic
deprecated by STREAMS-116
Posted by sb...@apache.org.
added timestamp logic
removed code and logic deprecated by STREAMS-116
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0103a967
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0103a967
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0103a967
Branch: refs/heads/STREAMS-112
Commit: 0103a967b0f65ead42215515b323771af7b66421
Parents: c2d59a2
Author: sblackmon <sb...@w2odigital.com>
Authored: Mon Aug 4 14:02:41 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon Aug 4 14:02:41 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchPersistReader.java | 18 ++--
.../elasticsearch/ElasticsearchQuery.java | 95 ++------------------
2 files changed, 16 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0103a967/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index 7ba9b33..5411094 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -194,15 +194,19 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Seriali
ObjectNode jsonObject = null;
try {
jsonObject = mapper.readValue(hit.getSourceAsString(), ObjectNode.class);
+ item = new StreamsDatum(jsonObject, hit.getId());
+ item.getMetadata().put("id", hit.getId());
+ item.getMetadata().put("index", hit.getIndex());
+ item.getMetadata().put("type", hit.getType());
+ if( hit.fields().containsKey("_timestamp")) {
+ DateTime timestamp = new DateTime(((Long) hit.field("_timestamp").getValue()).longValue());
+ item.setTimestamp(timestamp);
+ }
+ reader.write(item);
} catch (IOException e) {
- e.printStackTrace();
- break;
+ LOGGER.warn("Unable to process json source: ", hit.getSourceAsString());
}
- item = new StreamsDatum(jsonObject, hit.getId());
- item.getMetadata().put("id", hit.getId());
- item.getMetadata().put("index", hit.getIndex());
- item.getMetadata().put("type", hit.getType());
- reader.write(item);
+
}
try {
Thread.sleep(new Random().nextInt(100));
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0103a967/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
index defd9dc..7699bd4 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -117,14 +117,6 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
this.filterBuilder = filterBuilder;
}
- public void setWithfields(String[] withfields) {
- this.withfields = withfields;
- }
-
- public void setWithoutfields(String[] withoutfields) {
- this.withoutfields = withoutfields;
- }
-
public void execute(Object o) {
// If we haven't already set up the search, then set up the search.
@@ -133,8 +125,12 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
search = elasticsearchClientManager.getClient()
.prepareSearch(indexes.toArray(new String[0]))
.setSearchType(SearchType.SCAN)
+ .setExplain(true)
+ .addField("*")
+ .setFetchSource(true)
.setSize(Objects.firstNonNull(batchSize, DEFAULT_BATCH_SIZE).intValue())
- .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT));
+ .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT))
+ .addField("_timestamp");
String searchJson;
if( config.getSearch() != null ) {
@@ -161,23 +157,6 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
if (this.types != null && this.types.size() > 0)
search = search.setTypes(types.toArray(new String[0]));
- Integer clauses = 0;
- if (this.withfields != null || this.withoutfields != null) {
- if (this.withfields != null)
- clauses += this.withfields.length;
- if (this.withoutfields != null)
- clauses += this.withoutfields.length;
- }
-
- List<FilterBuilder> filterList = buildFilterList();
-
- FilterBuilder allFilters = andFilters(filterList);
-
- if (clauses > 0) {
- // search.setPostFilter(allFilters);
- search = search.setPostFilter(allFilters);
- }
-
// TODO: Replace when all clusters are upgraded past 0.90.4 so we can implement a RANDOM scroll.
if (this.random)
search = search.addSort(SortBuilders.scriptSort("random()", "number"));
@@ -258,68 +237,4 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
return scrollPositionInScroll != -1 && (!(this.totalRead > this.limit));
}
- // copied from elasticsearch
- // if we need this again we should factor it out into a utility
- private FilterBuilder andFilters(List<FilterBuilder> filters) {
- if (filters == null || filters.size() == 0)
- return null;
-
- FilterBuilder toReturn = filters.get(0);
-
- for (int i = 1; i < filters.size(); i++)
- toReturn = FilterBuilders.andFilter(toReturn, filters.get(i));
-
- return toReturn;
- }
-
- private FilterBuilder orFilters(List<FilterBuilder> filters) {
- if (filters == null || filters.size() == 0)
- return null;
-
- FilterBuilder toReturn = filters.get(0);
-
- for (int i = 1; i < filters.size(); i++)
- toReturn = FilterBuilders.orFilter(toReturn, filters.get(i));
-
- return toReturn;
- }
-
- private List<FilterBuilder> buildFilterList() {
-
- // If any withfields are specified, require that field be present
- // There must a value set also for the document to be processed
- // If any withoutfields are specified, require that field not be present
- // Document will be picked up even if present, if they do not have at least one value
- // this is annoying as it majorly impacts runtime
- // might be able to change behavior using null_field
-
- ArrayList<FilterBuilder> filterList = Lists.newArrayList();
-
- // If any withfields are specified, require that field be present
- // There must a value set also for the document to be processed
- if (this.withfields != null && this.withfields.length > 0) {
- ArrayList<FilterBuilder> withFilterList = Lists.newArrayList();
- for (String withfield : this.withfields) {
- FilterBuilder withFilter = FilterBuilders.existsFilter(withfield);
- withFilterList.add(withFilter);
- }
- //filterList.add(FilterBuilders.orFilter(orFilters(withFilterList)));
- filterList.add(withFilterList.get(0));
- }
- // If any withoutfields are specified, require that field not be present
- // Document will be picked up even if present, if they do not have at least one value
- // this is annoying as it majorly impacts runtime
- // might be able to change behavior using null_field
- if (this.withoutfields != null && this.withoutfields.length > 0) {
- ArrayList<FilterBuilder> withoutFilterList = Lists.newArrayList();
- for (String withoutfield : this.withoutfields) {
- FilterBuilder withoutFilter = FilterBuilders.missingFilter(withoutfield).existence(true).nullValue(false);
- withoutFilterList.add(withoutFilter);
- }
- //filterList.add(FilterBuilders.orFilter(orFilters(withoutFilterList)));
- filterList.add(withoutFilterList.get(0));
- }
-
- return filterList;
- }
}