You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/07/21 20:29:54 UTC
[01/11] git commit: STREAMS-131 | Updated stop to cleanup tasks and
shutdown handler
Repository: incubator-streams
Updated Branches:
refs/heads/master 7c5e29007 -> da3bc91a8
STREAMS-131 | Updated stop to cleanup tasks and shutdown handler
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a2e0bfc2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a2e0bfc2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a2e0bfc2
Branch: refs/heads/master
Commit: a2e0bfc248525a80f6db21540b37c1ba2aa8d274
Parents: 38ed41a
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 09:32:00 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 09:32:00 2014 -0400
----------------------------------------------------------------------
.../local/builders/LocalStreamBuilder.java | 34 +++++++++++++-------
.../local/tasks/StreamsPersistWriterTask.java | 1 +
.../local/tasks/StreamsProcessorTask.java | 1 +
.../local/tasks/StreamsProviderTask.java | 1 +
4 files changed, 25 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a2e0bfc2/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index c287a34..81d325f 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -55,6 +55,7 @@ public class LocalStreamBuilder implements StreamBuilder {
private int monitorTasks;
private LocalStreamProcessMonitorThread monitorThread;
private Map<String, List<StreamsTask>> tasks;
+ private Thread shutdownHook;
/**
*
@@ -91,6 +92,14 @@ public class LocalStreamBuilder implements StreamBuilder {
this.streamConfig = streamConfig;
this.totalTasks = 0;
this.monitorTasks = 0;
+ final LocalStreamBuilder self = this;
+ this.shutdownHook = new Thread() {
+ @Override
+ public void run() {
+ LOGGER.debug("Shutdown hook received. Beginning shutdown");
+ self.stop();
+ }
+ };
}
@Override
@@ -186,24 +195,23 @@ public class LocalStreamBuilder implements StreamBuilder {
Thread.sleep(3000);
}
}
- LOGGER.debug("Components are no longer running or timed out due to completion");
- shutdown(tasks);
+ LOGGER.debug("Components are no longer running or timed out");
} catch (InterruptedException e){
- forceShutdown(tasks);
+ LOGGER.warn("Runtime interrupted. Beginning shutdown");
+ } finally{
+ stop();
}
}
private void attachShutdownHandler() {
- final LocalStreamBuilder self = this;
LOGGER.debug("Attaching shutdown handler");
- Runtime.getRuntime().addShutdownHook(new Thread(){
- @Override
- public void run() {
- LOGGER.debug("Shutdown hook received. Beginning shutdown");
- self.stop();
- }
- });
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+ }
+
+ private void detachShutdownHandler() {
+ LOGGER.debug("Detaching shutdown handler");
+ Runtime.getRuntime().removeShutdownHook(shutdownHook);
}
protected void forceShutdown(Map<String, List<StreamsTask>> streamsTasks) {
@@ -232,7 +240,7 @@ public class LocalStreamBuilder implements StreamBuilder {
protected void shutdown(Map<String, List<StreamsTask>> streamsTasks) throws InterruptedException {
LOGGER.info("Attempting to shutdown tasks");
- monitorThread.shutdown();
+ this.monitorThread.shutdown();
this.executor.shutdown();
//complete stream shut down gracfully
for(StreamComponent prov : this.providers.values()) {
@@ -333,6 +341,8 @@ public class LocalStreamBuilder implements StreamBuilder {
shutdown(tasks);
} catch (Exception e) {
forceShutdown(tasks);
+ } finally {
+ detachShutdownHandler();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a2e0bfc2/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
index 4ec5fe9..bdda66a 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
@@ -123,6 +123,7 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
@Override
public void stopTask() {
this.keepRunning.set(false);
+ this.writer.cleanUp();
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a2e0bfc2/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
index 0d3f54a..7fbad16 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
@@ -63,6 +63,7 @@ public class StreamsProcessorTask extends BaseStreamsTask {
@Override
public void stopTask() {
this.keepRunning.set(false);
+ this.processor.cleanUp();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a2e0bfc2/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index 88f6f48..9eba6cd 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -110,6 +110,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
public void stopTask() {
LOGGER.debug("Stopping Provider Task for {}", this.provider.getClass().getSimpleName());
this.keepRunning.set(false);
+ this.provider.cleanUp();
}
@Override
[07/11] git commit: STREAMS-131 | Removed errant change of cleanup
call location
Posted by mf...@apache.org.
STREAMS-131 | Removed errant change of cleanup call location
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d7c073c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d7c073c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d7c073c5
Branch: refs/heads/master
Commit: d7c073c58888a0c61fa7d0e4eed4ab8277ca2a6d
Parents: 32bf3c3
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 14:55:25 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 14:55:25 2014 -0400
----------------------------------------------------------------------
.../org/apache/streams/local/tasks/StreamsPersistWriterTask.java | 1 -
.../java/org/apache/streams/local/tasks/StreamsProcessorTask.java | 3 +--
.../java/org/apache/streams/local/tasks/StreamsProviderTask.java | 1 -
3 files changed, 1 insertion(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d7c073c5/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
index bdda66a..4ec5fe9 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
@@ -123,7 +123,6 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
@Override
public void stopTask() {
this.keepRunning.set(false);
- this.writer.cleanUp();
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d7c073c5/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
index 7fbad16..f3004b4 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
@@ -63,7 +63,6 @@ public class StreamsProcessorTask extends BaseStreamsTask {
@Override
public void stopTask() {
this.keepRunning.set(false);
- this.processor.cleanUp();
}
@Override
@@ -106,8 +105,8 @@ public class StreamsProcessorTask extends BaseStreamsTask {
}
} finally {
- this.processor.cleanUp();
this.isRunning.set(false);
+ this.processor.cleanUp();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d7c073c5/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index 9eba6cd..88f6f48 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -110,7 +110,6 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
public void stopTask() {
LOGGER.debug("Stopping Provider Task for {}", this.provider.getClass().getSimpleName());
this.keepRunning.set(false);
- this.provider.cleanUp();
}
@Override
[06/11] git commit: STREAMS-131 | Updated mongo persist writer to
close background flush task on cleanup
Posted by mf...@apache.org.
STREAMS-131 | Updated mongo persist writer to close background flush task on cleanup
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/32bf3c3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/32bf3c3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/32bf3c3f
Branch: refs/heads/master
Commit: 32bf3c3fa97473b0eea0e272fd67023eef0a0f65
Parents: b45256c
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 13:00:07 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 13:00:07 2014 -0400
----------------------------------------------------------------------
.../streams/mongo/MongoPersistWriter.java | 21 ++++++++++++++++++--
1 file changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/32bf3c3f/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
index dc8d9cf..773327c 100644
--- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
+++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
@@ -130,13 +130,30 @@ public class MongoPersistWriter implements StreamsPersistWriter, Runnable {
try {
flush();
} catch (IOException e) {
- e.printStackTrace();
+ LOGGER.error("Error flushing", e);
}
try {
close();
} catch (IOException e) {
- e.printStackTrace();
+ LOGGER.error("Error closing", e);
}
+ try {
+ backgroundFlushTask.shutdown();
+ // Wait a while for existing tasks to terminate
+ if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) {
+ backgroundFlushTask.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) {
+ LOGGER.error("Stream did not terminate");
+ }
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ backgroundFlushTask.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+
}
@Override
[10/11] git commit: STREAMS-131 | Updated per code review comment
Posted by mf...@apache.org.
STREAMS-131 | Updated per code review comment
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e1f8eea7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e1f8eea7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e1f8eea7
Branch: refs/heads/master
Commit: e1f8eea7c1eb13608f6396472f7a9df3a07a4b44
Parents: c343a1b
Author: mfranklin <mf...@apache.org>
Authored: Fri Jul 18 12:13:23 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Fri Jul 18 12:13:23 2014 -0400
----------------------------------------------------------------------
.../java/org/apache/streams/local/tasks/StreamsProviderTask.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e1f8eea7/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index 88f6f48..f5189eb 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -174,8 +174,8 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
LOGGER.error("Error in processing provider stream", e);
} finally {
LOGGER.debug("Complete Provider Task execution for {}", this.provider.getClass().getSimpleName());
- this.keepRunning.set(false);
this.provider.cleanUp();
+ this.keepRunning.set(false);
}
}
[05/11] git commit: STREAMS-131 | Updated elasticsearch persist
writer to close timer
Posted by mf...@apache.org.
STREAMS-131 | Updated elasticsearch persist writer to close timer
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b45256c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b45256c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b45256c3
Branch: refs/heads/master
Commit: b45256c3ab87ba385fdee5170528dcffabecac57
Parents: 0d1fc31
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 11:20:03 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 11:20:03 2014 -0400
----------------------------------------------------------------------
.../apache/streams/elasticsearch/ElasticsearchPersistWriter.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b45256c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index caf5ec3..5b90e8b 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -176,6 +176,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding());
manager.stop();
+ timer.cancel();
} catch (Throwable e) {
// this line of code should be logically unreachable.
[09/11] git commit: STREAMS-131 | Fixed incorrect closing of Sysomos
reader task
Posted by mf...@apache.org.
STREAMS-131 | Fixed incorrect closing of Sysomos reader task
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c343a1b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c343a1b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c343a1b0
Branch: refs/heads/master
Commit: c343a1b0063eeaac8f08ee84cc4fafcc5923d266
Parents: d41890d
Author: mfranklin <mf...@apache.org>
Authored: Mon Jul 14 09:02:46 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jul 14 09:02:46 2014 -0400
----------------------------------------------------------------------
.../org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c343a1b0/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
index 1dde4fa..3478671 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
@@ -79,7 +79,6 @@ public class SysomosHeartbeatStream implements Runnable {
executeRun();
} catch (Exception e) {
LOGGER.error("Error executing heartbeat stream", e);
- } finally {
shutdown();
}
}
[11/11] git commit: Merge remote-tracking branch 'origin/STREAMS-131'
Posted by mf...@apache.org.
Merge remote-tracking branch 'origin/STREAMS-131'
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/da3bc91a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/da3bc91a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/da3bc91a
Branch: refs/heads/master
Commit: da3bc91a888df7c4d2fe3fb38cf340cc7de00477
Parents: 7c5e290 e1f8eea
Author: mfranklin <mf...@apache.org>
Authored: Mon Jul 21 13:31:11 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jul 21 13:31:11 2014 -0400
----------------------------------------------------------------------
.../ElasticsearchClientManager.java | 8 ++--
.../ElasticsearchPersistReader.java | 3 ++
.../ElasticsearchPersistWriter.java | 4 +-
.../elasticsearch/ElasticsearchQuery.java | 3 ++
.../streams/mongo/MongoPersistWriter.java | 21 +++++++++-
.../provider/SysomosHeartbeatStream.java | 1 -
.../local/builders/LocalStreamBuilder.java | 40 ++++++++++++++------
.../local/tasks/StreamsProcessorTask.java | 2 +-
.../local/tasks/StreamsProviderTask.java | 2 +-
9 files changed, 62 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
[08/11] git commit: STREAMS-131 | Because the elastic search
components use the same static client, closing the reader can close the writer
Posted by mf...@apache.org.
STREAMS-131 | Because the elastic search components use the same static client, closing the reader can close the writer
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d41890d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d41890d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d41890d9
Branch: refs/heads/master
Commit: d41890d9adafc94a7d09227e57d0b811f293dbb9
Parents: d7c073c
Author: mfranklin <mf...@apache.org>
Authored: Mon Jul 14 08:23:01 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jul 14 08:23:01 2014 -0400
----------------------------------------------------------------------
.../apache/streams/elasticsearch/ElasticsearchPersistWriter.java | 1 -
.../java/org/apache/streams/elasticsearch/ElasticsearchQuery.java | 1 -
2 files changed, 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d41890d9/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 5b90e8b..05b0ef2 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -175,7 +175,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
refreshIndexes();
LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding());
- manager.stop();
timer.cancel();
} catch (Throwable e) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d41890d9/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
index 4dd3d28..defd9dc 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -248,7 +248,6 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
}
public void cleanUp() {
- this.elasticsearchClientManager.stop();
}
protected boolean isCompleted() {
[02/11] git commit: STREAMS-131 | Added flag to shutdown to indicate
if the JVM is shutting down
Posted by mf...@apache.org.
STREAMS-131 | Added flag to shutdown to indicate if the JVM is shutting down
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a5e60126
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a5e60126
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a5e60126
Branch: refs/heads/master
Commit: a5e60126847d40c079ac6d05dc5ece92f62607e9
Parents: a2e0bfc
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 09:52:46 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 09:52:46 2014 -0400
----------------------------------------------------------------------
.../apache/streams/local/builders/LocalStreamBuilder.java | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a5e60126/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 81d325f..25f9fe7 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -97,7 +97,7 @@ public class LocalStreamBuilder implements StreamBuilder {
@Override
public void run() {
LOGGER.debug("Shutdown hook received. Beginning shutdown");
- self.stop();
+ self.stopInternal(true);
}
};
}
@@ -337,12 +337,18 @@ public class LocalStreamBuilder implements StreamBuilder {
*/
@Override
public void stop() {
+ stopInternal(false);
+ }
+
+ protected void stopInternal(boolean systemExiting) {
try {
shutdown(tasks);
} catch (Exception e) {
forceShutdown(tasks);
} finally {
- detachShutdownHandler();
+ if(!systemExiting) {
+ detachShutdownHandler();
+ }
}
}
[03/11] git commit: STREAMS-131 | Closing connections
Posted by mf...@apache.org.
STREAMS-131 | Closing connections
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/78aa10f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/78aa10f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/78aa10f4
Branch: refs/heads/master
Commit: 78aa10f4f371b1b7aee1c4e21231956dad015268
Parents: a5e6012
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 10:47:29 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 10:47:29 2014 -0400
----------------------------------------------------------------------
.../apache/streams/elasticsearch/ElasticsearchPersistReader.java | 3 +++
.../apache/streams/elasticsearch/ElasticsearchPersistWriter.java | 4 +++-
.../org/apache/streams/elasticsearch/ElasticsearchQuery.java | 4 ++++
3 files changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/78aa10f4/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index 4196a46..7ba9b33 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -128,6 +128,9 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Seriali
public void cleanUp() {
this.shutdownAndAwaitTermination(executor);
LOGGER.info("PersistReader done");
+ if(elasticsearchQuery != null) {
+ elasticsearchQuery.cleanUp();
+ }
}
//The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/78aa10f4/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 72145e1..caf5ec3 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -175,6 +175,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
refreshIndexes();
LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding());
+ manager.stop();
} catch (Throwable e) {
// this line of code should be logically unreachable.
@@ -248,7 +249,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
Thread.sleep(1);
counter++;
} catch(InterruptedException ie) {
- // No Operation
+ LOGGER.warn("Catchup was interrupted. Data may be lost");
+ return;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/78aa10f4/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
index fbc62c4..4dd3d28 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -247,6 +247,10 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH
public void remove() {
}
+ public void cleanUp() {
+ this.elasticsearchClientManager.stop();
+ }
+
protected boolean isCompleted() {
return totalRead >= this.limit && hasRecords();
}
[04/11] git commit: STREAMS-131 | Fixed bug in elasticsearch client
manager close
Posted by mf...@apache.org.
STREAMS-131 | Fixed bug in elasticsearch client manager close
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0d1fc31a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0d1fc31a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0d1fc31a
Branch: refs/heads/master
Commit: 0d1fc31a87d50fed37860667e34b5d0e1fff6b35
Parents: 78aa10f
Author: mfranklin <mf...@apache.org>
Authored: Thu Jul 10 11:12:18 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu Jul 10 11:12:18 2014 -0400
----------------------------------------------------------------------
.../streams/elasticsearch/ElasticsearchClientManager.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0d1fc31a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
index 340d2b9..d107e70 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
@@ -80,7 +80,7 @@ public class ElasticsearchClientManager {
}
public boolean isOnOrAfterVersion(Version version) {
- return ALL_CLIENTS.get(this.elasticsearchConfiguration.toString()).getVersion().onOrAfter(version);
+ return ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getVersion().onOrAfter(version);
}
public void start() throws Exception {
@@ -110,12 +110,12 @@ public class ElasticsearchClientManager {
public synchronized void stop() {
// Terminate the elasticsearch cluster
// Check to see if we have a client.
- if (ALL_CLIENTS.containsKey(this.elasticsearchConfiguration.toString())) {
+ if (ALL_CLIENTS.containsKey(this.elasticsearchConfiguration.getClusterName())) {
// Close the client
- ALL_CLIENTS.get(this.elasticsearchConfiguration.toString()).getClient().close();
+ ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient().close();
// Remove it so that it isn't in memory any more.
- ALL_CLIENTS.remove(this.elasticsearchConfiguration.toString());
+ ALL_CLIENTS.remove(this.elasticsearchConfiguration.getClusterName());
}
}