You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2015/08/28 22:14:44 UTC

incubator-streams git commit: resolves STREAMS-358 now you have to explicitly ask for a refresh for a refresh to occur

Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-358 [created] 193151fb6


resolves STREAMS-358
now you have to explicitly ask for a refresh for a refresh to occur


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/193151fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/193151fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/193151fb

Branch: refs/heads/STREAMS-358
Commit: 193151fb631052a1d589b6122d62627b1c6f49cb
Parents: e52cb25
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Fri Aug 28 15:14:39 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Fri Aug 28 15:14:39 2015 -0500

----------------------------------------------------------------------
 .../ElasticsearchPersistWriter.java             | 63 ++++++++++++--------
 .../elasticsearch/ElasticsearchQuery.java       |  6 +-
 .../ElasticsearchWriterConfiguration.json       |  7 ++-
 3 files changed, 48 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/193151fb/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index d4cd84a..549920b 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -205,9 +205,14 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
             LOGGER.warn("This is unexpected: {}", e);
         } finally {
 
-            refreshIndexes();
+            if( veryLargeBulk == true ) {
+                resetRefreshInterval();
+            }
 
-            LOGGER.debug("refreshIndexes completed");
+            if( config.getRefresh() ) {
+                refreshIndexes();
+                LOGGER.debug("refreshIndexes completed");
+            }
 
             LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding());
             timer.cancel();
@@ -216,7 +221,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
         }
     }
 
-    private void refreshIndexes() {
+    private void resetRefreshInterval() {
         for (String indexName : this.affectedIndexes) {
 
             if (this.veryLargeBulk) {
@@ -233,16 +238,21 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
                         .updateSettings(updateSettingsRequest)
                         .actionGet();
             }
+        }
+    }
 
-            checkIndexImplications(indexName);
+    private void refreshIndexes() {
+        for (String indexName : this.affectedIndexes) {
 
-            LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
-            this.manager.getClient()
-                    .admin()
-                    .indices()
-                    .prepareRefresh(indexName)
-                    .execute()
-                    .actionGet();
+            if (config.getRefresh()) {
+                LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
+                this.manager.getClient()
+                        .admin()
+                        .indices()
+                        .prepareRefresh(indexName)
+                        .execute()
+                        .actionGet();
+            }
         }
     }
 
@@ -402,22 +412,23 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
             // we haven't log this index.
             this.affectedIndexes.add(indexName);
 
-            // Check to see if we are in 'veryLargeBulk' mode
-            // if we aren't, exit early
-            if (this.veryLargeBulk) {
+        }
+    }
 
-                // They are in 'very large bulk' mode we want to turn off refreshing the index.
-                // Create a request then add the setting to tell it to stop refreshing the interval
-                UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
-                updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1));
+    protected void disableRefresh() {
 
-                // submit to ElasticSearch
-                this.manager.getClient()
-                        .admin()
-                        .indices()
-                        .updateSettings(updateSettingsRequest)
-                        .actionGet();
-            }
+        for (String indexName : this.affectedIndexes) {
+            // They are in 'very large bulk' mode we want to turn off refreshing the index.
+            // Create a request then add the setting to tell it to stop refreshing the interval
+            UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
+            updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1));
+
+            // submit to ElasticSearch
+            this.manager.getClient()
+                    .admin()
+                    .indices()
+                    .updateSettings(updateSettingsRequest)
+                    .actionGet();
         }
     }
 
@@ -468,6 +479,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
             }
         }, this.flushThresholdTime, this.flushThresholdTime);
 
+        if( veryLargeBulk )
+            disableRefresh();
     }
 
     private void flush(final BulkRequestBuilder bulkRequest, final Long sent, final Long sizeInBytes) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/193151fb/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
index 2ea3624..f92c1ef 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -126,23 +126,25 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
                     .setScroll(scrollTimeout)
                     .addField("_timestamp");
 
+            LOGGER.debug("Search source: " + search.toString());
+
             String searchJson;
             if( config.getSearch() != null ) {
                 LOGGER.debug("Have config in Reader: " + config.getSearch().toString());
 
                 try {
                     searchJson = mapper.writeValueAsString(config.getSearch());
-                    LOGGER.debug("Setting source: " + searchJson);
+                    LOGGER.debug("Extra source: " + searchJson);
                     search = search.setExtraSource(searchJson);
 
                 } catch (JsonProcessingException e) {
                     LOGGER.warn("Could not apply _search supplied by config", e.getMessage());
                 }
 
-                LOGGER.debug("Search Source is now " + search.toString());
 
             }
 
+            LOGGER.debug("Final Search: " + search.internalBuilder().toString());
 
             if (this.queryBuilder != null)
                 search = search.setQuery(this.queryBuilder);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/193151fb/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index 6000ea8..22317d0 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -57,6 +57,11 @@
          "forceUseConfig": {
              "type": "boolean",
              "description": "Whether or not we force the values that are set in the configuration"
-         }
+         },
+        "refresh": {
+            "type": "boolean",
+            "description": "Whether to refresh during cleanup",
+            "default": "false"
+        }
     }
 }
\ No newline at end of file