You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/07/21 20:29:56 UTC

[03/11] git commit: STREAMS-131 | Closing connections

STREAMS-131 | Closing connections


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/78aa10f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/78aa10f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/78aa10f4

Branch: refs/heads/master
Commit: 78aa10f4f371b1b7aee1c4e21231956dad015268
Parents: a5e6012
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 10:47:29 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 10:47:29 2014 -0400

----------------------------------------------------------------------
 .../apache/streams/elasticsearch/ElasticsearchPersistReader.java | 3 +++
 .../apache/streams/elasticsearch/ElasticsearchPersistWriter.java | 4 +++-
 .../org/apache/streams/elasticsearch/ElasticsearchQuery.java     | 4 ++++
 3 files changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/78aa10f4/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index 4196a46..7ba9b33 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -128,6 +128,9 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Seriali
     public void cleanUp() {
         this.shutdownAndAwaitTermination(executor);
         LOGGER.info("PersistReader done");
+        if(elasticsearchQuery != null) {
+            elasticsearchQuery.cleanUp();
+        }
     }
 
     //The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/78aa10f4/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 72145e1..caf5ec3 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -175,6 +175,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
             refreshIndexes();
 
             LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding());
+            manager.stop();
 
         } catch (Throwable e) {
             // this line of code should be logically unreachable.
@@ -248,7 +249,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
                 Thread.sleep(1);
                 counter++;
             } catch(InterruptedException ie) {
-                // No Operation
+                LOGGER.warn("Catchup was interrupted.  Data may be lost");
+                return;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/78aa10f4/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
index fbc62c4..4dd3d28 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -247,6 +247,10 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
     public void remove() {
     }
 
+    public void cleanUp() {
+        this.elasticsearchClientManager.stop();
+    }
+
     protected boolean isCompleted() {
         return totalRead >= this.limit && hasRecords();
     }