You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2015/08/28 22:14:44 UTC
incubator-streams git commit: resolves STREAMS-358 now you have to
explicitly ask for a refresh for a refresh to occur
Repository: incubator-streams
Updated Branches:
refs/heads/STREAMS-358 [created] 193151fb6
resolves STREAMS-358
now you have to explicitly ask for a refresh for a refresh to occur
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/193151fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/193151fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/193151fb
Branch: refs/heads/STREAMS-358
Commit: 193151fb631052a1d589b6122d62627b1c6f49cb
Parents: e52cb25
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Fri Aug 28 15:14:39 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Fri Aug 28 15:14:39 2015 -0500
----------------------------------------------------------------------
.../ElasticsearchPersistWriter.java | 63 ++++++++++++--------
.../elasticsearch/ElasticsearchQuery.java | 6 +-
.../ElasticsearchWriterConfiguration.json | 7 ++-
3 files changed, 48 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/193151fb/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index d4cd84a..549920b 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -205,9 +205,14 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
LOGGER.warn("This is unexpected: {}", e);
} finally {
- refreshIndexes();
+ if( veryLargeBulk == true ) {
+ resetRefreshInterval();
+ }
- LOGGER.debug("refreshIndexes completed");
+ if( config.getRefresh() ) {
+ refreshIndexes();
+ LOGGER.debug("refreshIndexes completed");
+ }
LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding());
timer.cancel();
@@ -216,7 +221,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
}
}
- private void refreshIndexes() {
+ private void resetRefreshInterval() {
for (String indexName : this.affectedIndexes) {
if (this.veryLargeBulk) {
@@ -233,16 +238,21 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
.updateSettings(updateSettingsRequest)
.actionGet();
}
+ }
+ }
- checkIndexImplications(indexName);
+ private void refreshIndexes() {
+ for (String indexName : this.affectedIndexes) {
- LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
- this.manager.getClient()
- .admin()
- .indices()
- .prepareRefresh(indexName)
- .execute()
- .actionGet();
+ if (config.getRefresh()) {
+ LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
+ this.manager.getClient()
+ .admin()
+ .indices()
+ .prepareRefresh(indexName)
+ .execute()
+ .actionGet();
+ }
}
}
@@ -402,22 +412,23 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
// we haven't log this index.
this.affectedIndexes.add(indexName);
- // Check to see if we are in 'veryLargeBulk' mode
- // if we aren't, exit early
- if (this.veryLargeBulk) {
+ }
+ }
- // They are in 'very large bulk' mode we want to turn off refreshing the index.
- // Create a request then add the setting to tell it to stop refreshing the interval
- UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
- updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1));
+ protected void disableRefresh() {
- // submit to ElasticSearch
- this.manager.getClient()
- .admin()
- .indices()
- .updateSettings(updateSettingsRequest)
- .actionGet();
- }
+ for (String indexName : this.affectedIndexes) {
+ // They are in 'very large bulk' mode we want to turn off refreshing the index.
+ // Create a request then add the setting to tell it to stop refreshing the interval
+ UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
+ updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1));
+
+ // submit to ElasticSearch
+ this.manager.getClient()
+ .admin()
+ .indices()
+ .updateSettings(updateSettingsRequest)
+ .actionGet();
}
}
@@ -468,6 +479,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
}
}, this.flushThresholdTime, this.flushThresholdTime);
+ if( veryLargeBulk )
+ disableRefresh();
}
private void flush(final BulkRequestBuilder bulkRequest, final Long sent, final Long sizeInBytes) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/193151fb/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
index 2ea3624..f92c1ef 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -126,23 +126,25 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
.setScroll(scrollTimeout)
.addField("_timestamp");
+ LOGGER.debug("Search source: " + search.toString());
+
String searchJson;
if( config.getSearch() != null ) {
LOGGER.debug("Have config in Reader: " + config.getSearch().toString());
try {
searchJson = mapper.writeValueAsString(config.getSearch());
- LOGGER.debug("Setting source: " + searchJson);
+ LOGGER.debug("Extra source: " + searchJson);
search = search.setExtraSource(searchJson);
} catch (JsonProcessingException e) {
LOGGER.warn("Could not apply _search supplied by config", e.getMessage());
}
- LOGGER.debug("Search Source is now " + search.toString());
}
+ LOGGER.debug("Final Search: " + search.internalBuilder().toString());
if (this.queryBuilder != null)
search = search.setQuery(this.queryBuilder);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/193151fb/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index 6000ea8..22317d0 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -57,6 +57,11 @@
"forceUseConfig": {
"type": "boolean",
"description": "Whether or not we force the values that are set in the configuration"
- }
+ },
+ "refresh": {
+ "type": "boolean",
+ "description": "Whether to refresh during cleanup",
+ "default": "false"
+ }
}
}
\ No newline at end of file