You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/07/10 19:00:26 UTC
[1/6] git commit: STREAMS-131 | Updated stop to cleanup tasks and
shutdown handler
Repository: incubator-streams
Updated Branches:
refs/heads/STREAMS-131 [created] 32bf3c3fa
STREAMS-131 | Updated stop to cleanup tasks and shutdown handler
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a2e0bfc2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a2e0bfc2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a2e0bfc2
Branch: refs/heads/STREAMS-131
Commit: a2e0bfc248525a80f6db21540b37c1ba2aa8d274
Parents: 38ed41a
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 09:32:00 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 09:32:00 2014 -0400
----------------------------------------------------------------------
.../local/builders/LocalStreamBuilder.java | 34 +++++++++++++-------
.../local/tasks/StreamsPersistWriterTask.java | 1 +
.../local/tasks/StreamsProcessorTask.java | 1 +
.../local/tasks/StreamsProviderTask.java | 1 +
4 files changed, 25 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a2e0bfc2/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index c287a34..81d325f 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -55,6 +55,7 @@ public class LocalStreamBuilder implements StreamBuilder {
private int monitorTasks;
private LocalStreamProcessMonitorThread monitorThread;
private Map<String, List<StreamsTask>> tasks;
+ private Thread shutdownHook;
/**
*
@@ -91,6 +92,14 @@ public class LocalStreamBuilder implements StreamBuilder {
this.streamConfig = streamConfig;
this.totalTasks = 0;
this.monitorTasks = 0;
+ final LocalStreamBuilder self = this;
+ this.shutdownHook = new Thread() {
+ @Override
+ public void run() {
+ LOGGER.debug("Shutdown hook received. Beginning shutdown");
+ self.stop();
+ }
+ };
}
@Override
@@ -186,24 +195,23 @@ public class LocalStreamBuilder implements StreamBuilder {
Thread.sleep(3000);
}
}
- LOGGER.debug("Components are no longer running or timed out due to completion");
- shutdown(tasks);
+ LOGGER.debug("Components are no longer running or timed out");
} catch (InterruptedException e){
- forceShutdown(tasks);
+ LOGGER.warn("Runtime interrupted. Beginning shutdown");
+ } finally{
+ stop();
}
}
private void attachShutdownHandler() {
- final LocalStreamBuilder self = this;
LOGGER.debug("Attaching shutdown handler");
- Runtime.getRuntime().addShutdownHook(new Thread(){
- @Override
- public void run() {
- LOGGER.debug("Shutdown hook received. Beginning shutdown");
- self.stop();
- }
- });
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+ }
+
+ private void detachShutdownHandler() {
+ LOGGER.debug("Detaching shutdown handler");
+ Runtime.getRuntime().removeShutdownHook(shutdownHook);
}
protected void forceShutdown(Map<String, List<StreamsTask>> streamsTasks) {
@@ -232,7 +240,7 @@ public class LocalStreamBuilder implements StreamBuilder {
protected void shutdown(Map<String, List<StreamsTask>> streamsTasks) throws InterruptedException {
LOGGER.info("Attempting to shutdown tasks");
- monitorThread.shutdown();
+ this.monitorThread.shutdown();
this.executor.shutdown();
//complete stream shut down gracfully
for(StreamComponent prov : this.providers.values()) {
@@ -333,6 +341,8 @@ public class LocalStreamBuilder implements StreamBuilder {
shutdown(tasks);
} catch (Exception e) {
forceShutdown(tasks);
+ } finally {
+ detachShutdownHandler();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a2e0bfc2/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
index 4ec5fe9..bdda66a 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
@@ -123,6 +123,7 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
@Override
public void stopTask() {
this.keepRunning.set(false);
+ this.writer.cleanUp();
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a2e0bfc2/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
index 0d3f54a..7fbad16 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
@@ -63,6 +63,7 @@ public class StreamsProcessorTask extends BaseStreamsTask {
@Override
public void stopTask() {
this.keepRunning.set(false);
+ this.processor.cleanUp();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a2e0bfc2/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index 88f6f48..9eba6cd 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -110,6 +110,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
public void stopTask() {
LOGGER.debug("Stopping Provider Task for {}", this.provider.getClass().getSimpleName());
this.keepRunning.set(false);
+ this.provider.cleanUp();
}
@Override
[3/6] git commit: STREAMS-131 | Closing connections
Posted by mf...@apache.org.
STREAMS-131 | Closing connections
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/78aa10f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/78aa10f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/78aa10f4
Branch: refs/heads/STREAMS-131
Commit: 78aa10f4f371b1b7aee1c4e21231956dad015268
Parents: a5e6012
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 10:47:29 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 10:47:29 2014 -0400
----------------------------------------------------------------------
.../apache/streams/elasticsearch/ElasticsearchPersistReader.java | 3 +++
.../apache/streams/elasticsearch/ElasticsearchPersistWriter.java | 4 +++-
.../org/apache/streams/elasticsearch/ElasticsearchQuery.java | 4 ++++
3 files changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/78aa10f4/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index 4196a46..7ba9b33 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -128,6 +128,9 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Seriali
public void cleanUp() {
this.shutdownAndAwaitTermination(executor);
LOGGER.info("PersistReader done");
+ if(elasticsearchQuery != null) {
+ elasticsearchQuery.cleanUp();
+ }
}
//The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/78aa10f4/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 72145e1..caf5ec3 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -175,6 +175,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
refreshIndexes();
LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding());
+ manager.stop();
} catch (Throwable e) {
// this line of code should be logically unreachable.
@@ -248,7 +249,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
Thread.sleep(1);
counter++;
} catch(InterruptedException ie) {
- // No Operation
+ LOGGER.warn("Catchup was interrupted. Data may be lost");
+ return;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/78aa10f4/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
index fbc62c4..4dd3d28 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -247,6 +247,10 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
public void remove() {
}
+ public void cleanUp() {
+ this.elasticsearchClientManager.stop();
+ }
+
protected boolean isCompleted() {
return totalRead >= this.limit && hasRecords();
}
[6/6] git commit: STREAMS-131 | Updated mongo persist writer to close
background flush task on cleanup
Posted by mf...@apache.org.
STREAMS-131 | Updated mongo persist writer to close background flush task on cleanup
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/32bf3c3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/32bf3c3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/32bf3c3f
Branch: refs/heads/STREAMS-131
Commit: 32bf3c3fa97473b0eea0e272fd67023eef0a0f65
Parents: b45256c
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 13:00:07 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 13:00:07 2014 -0400
----------------------------------------------------------------------
.../streams/mongo/MongoPersistWriter.java | 21 ++++++++++++++++++--
1 file changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/32bf3c3f/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
index dc8d9cf..773327c 100644
--- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
+++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
@@ -130,13 +130,30 @@ public class MongoPersistWriter implements StreamsPersistWriter, Runnable {
try {
flush();
} catch (IOException e) {
- e.printStackTrace();
+ LOGGER.error("Error flushing", e);
}
try {
close();
} catch (IOException e) {
- e.printStackTrace();
+ LOGGER.error("Error closing", e);
}
+ try {
+ backgroundFlushTask.shutdown();
+ // Wait a while for existing tasks to terminate
+ if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) {
+ backgroundFlushTask.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) {
+ LOGGER.error("Stream did not terminate");
+ }
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ backgroundFlushTask.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+
}
@Override
[4/6] git commit: STREAMS-131 | Fixed bug in elasticsearch client
manager close
Posted by mf...@apache.org.
STREAMS-131 | Fixed bug in elasticsearch client manager close
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0d1fc31a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0d1fc31a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0d1fc31a
Branch: refs/heads/STREAMS-131
Commit: 0d1fc31a87d50fed37860667e34b5d0e1fff6b35
Parents: 78aa10f
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 11:12:18 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 11:12:18 2014 -0400
----------------------------------------------------------------------
.../streams/elasticsearch/ElasticsearchClientManager.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d1fc31a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
index 340d2b9..d107e70 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
@@ -80,7 +80,7 @@ public class ElasticsearchClientManager {
}
public boolean isOnOrAfterVersion(Version version) {
- return ALL_CLIENTS.get(this.elasticsearchConfiguration.toString()).getVersion().onOrAfter(version);
+ return ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getVersion().onOrAfter(version);
}
public void start() throws Exception {
@@ -110,12 +110,12 @@ public class ElasticsearchClientManager {
public synchronized void stop() {
// Terminate the elasticsearch cluster
// Check to see if we have a client.
- if (ALL_CLIENTS.containsKey(this.elasticsearchConfiguration.toString())) {
+ if (ALL_CLIENTS.containsKey(this.elasticsearchConfiguration.getClusterName())) {
// Close the client
- ALL_CLIENTS.get(this.elasticsearchConfiguration.toString()).getClient().close();
+ ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient().close();
// Remove it so that it isn't in memory any more.
- ALL_CLIENTS.remove(this.elasticsearchConfiguration.toString());
+ ALL_CLIENTS.remove(this.elasticsearchConfiguration.getClusterName());
}
}
[5/6] git commit: STREAMS-131 | Updated elasticsearch persist writer
to close timer
Posted by mf...@apache.org.
STREAMS-131 | Updated elasticsearch persist writer to close timer
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b45256c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b45256c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b45256c3
Branch: refs/heads/STREAMS-131
Commit: b45256c3ab87ba385fdee5170528dcffabecac57
Parents: 0d1fc31
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 11:20:03 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 11:20:03 2014 -0400
----------------------------------------------------------------------
.../apache/streams/elasticsearch/ElasticsearchPersistWriter.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b45256c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index caf5ec3..5b90e8b 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -176,6 +176,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding());
manager.stop();
+ timer.cancel();
} catch (Throwable e) {
// this line of code should be logically unreachable.
[2/6] git commit: STREAMS-131 | Added flag to shutdown to indicate if
the JVM is shutting down
Posted by mf...@apache.org.
STREAMS-131 | Added flag to shutdown to indicate if the JVM is shutting down
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a5e60126
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a5e60126
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a5e60126
Branch: refs/heads/STREAMS-131
Commit: a5e60126847d40c079ac6d05dc5ece92f62607e9
Parents: a2e0bfc
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 09:52:46 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 09:52:46 2014 -0400
----------------------------------------------------------------------
.../apache/streams/local/builders/LocalStreamBuilder.java | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a5e60126/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 81d325f..25f9fe7 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -97,7 +97,7 @@ public class LocalStreamBuilder implements StreamBuilder {
@Override
public void run() {
LOGGER.debug("Shutdown hook received. Beginning shutdown");
- self.stop();
+ self.stopInternal(true);
}
};
}
@@ -337,12 +337,18 @@ public class LocalStreamBuilder implements StreamBuilder {
*/
@Override
public void stop() {
+ stopInternal(false);
+ }
+
+ protected void stopInternal(boolean systemExiting) {
try {
shutdown(tasks);
} catch (Exception e) {
forceShutdown(tasks);
} finally {
- detachShutdownHandler();
+ if(!systemExiting) {
+ detachShutdownHandler();
+ }
}
}