You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/05/02 14:59:16 UTC
[1/4] git commit: Updated Persist writer to have a configurable
background flush thread
Repository: incubator-streams
Updated Branches:
refs/heads/STREAMS-72 [created] f62afa57c
Updated Persist writer to have a configurable background flush thread
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c837a2cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c837a2cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c837a2cf
Branch: refs/heads/STREAMS-72
Commit: c837a2cf8800efd324003ba5a9299c16b9f8701a
Parents: e6ffe29
Author: mfranklin <mf...@apache.org>
Authored: Thu May 1 13:59:32 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu May 1 13:59:32 2014 -0400
----------------------------------------------------------------------
.../ElasticsearchConfigurator.java | 4 +-
.../ElasticsearchPersistWriter.java | 184 +++++++++++--------
.../ElasticsearchWriterConfiguration.json | 6 +-
3 files changed, 116 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c837a2cf/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
index 483a177..8a343a6 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
@@ -8,7 +8,7 @@ import org.slf4j.LoggerFactory;
import java.util.List;
/**
- * Created by sblackmon on 12/10/13.
+ * Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration
*/
public class ElasticsearchConfigurator {
@@ -51,9 +51,11 @@ public class ElasticsearchConfigurator {
String index = elasticsearch.getString("index");
String type = elasticsearch.getString("type");
+ Long maxMsBeforeFlush = elasticsearch.hasPath("MaxTimeBetweenFlushMs") ? elasticsearch.getLong("MaxTimeBetweenFlushMs") : null;
elasticsearchWriterConfiguration.setIndex(index);
elasticsearchWriterConfiguration.setType(type);
+ elasticsearchWriterConfiguration.setMaxTimeBetweenFlushMs(maxMsBeforeFlush);
return elasticsearchWriterConfiguration;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c837a2cf/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index ff7e0f5..c574eeb 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -36,10 +36,15 @@ import java.io.OutputStreamWriter;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable {
public static final String STREAMS_ID = "ElasticsearchPersistWriter";
-
public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
@@ -49,8 +54,11 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
private static final long WAITING_DOCS_LIMIT = 10000;
private static final int BYTES_IN_MB = 1024 * 1024;
private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
+ private static final long MAX_MS_BEFORE_FLUSH = 10000;
private final List<String> affectedIndexes = new ArrayList<String>();
+ private final ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
private ObjectMapper mapper = new StreamsJacksonMapper();
private ElasticsearchClientManager manager;
@@ -61,6 +69,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
private OutputStreamWriter currentWriter = null;
private int batchSize = 50;
private int totalRecordsWritten = 0;
+ private long maxMsBeforeFlush;
private boolean veryLargeBulk = false; // by default this setting is set to false
protected Thread task;
@@ -78,10 +87,11 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
private volatile int batchItemsSent = 0;
private volatile int totalByteCount = 0;
private volatile int byteCount = 0;
+ private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
public ElasticsearchPersistWriter() {
Config config = StreamsConfigurator.config.getConfig("elasticsearch");
- this.config = (ElasticsearchWriterConfiguration) ElasticsearchConfigurator.detectConfiguration(config);
+ this.config = ElasticsearchConfigurator.detectWriterConfiguration(config);
}
public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
@@ -173,6 +183,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
try {
flush();
+ backgroundFlushTask.shutdownNow();
} catch (IOException e) {
e.printStackTrace();
}
@@ -240,6 +251,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
@Override
public void prepare(Object configurationObject) {
+ maxMsBeforeFlush = config.getMaxTimeBetweenFlushMs() == null ? MAX_MS_BEFORE_FLUSH : config.getMaxTimeBetweenFlushMs();
mapper = StreamsJacksonMapper.getInstance();
start();
}
@@ -254,7 +266,17 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
}
public void start() {
-
+ backgroundFlushTask.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ LOGGER.debug("Checking to see if data needs to be flushed");
+ long time = System.currentTimeMillis() - lastWrite.get();
+ if (time > maxMsBeforeFlush && batchItemsSent > 0) {
+ LOGGER.debug("Background Flush task determined {} are waiting to be flushed. It has been {} since the last write to ES", batchItemsSent, time);
+ flushInternal();
+ }
+ }
+ }, 0, maxMsBeforeFlush * 2, TimeUnit.MILLISECONDS);
manager = new ElasticsearchClientManager(config);
client = manager.getClient();
@@ -262,62 +284,63 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
}
public void flushInternal() {
- synchronized (this) {
- // we do not have a working bulk request, we can just exit here.
- if (this.bulkRequest == null || batchItemsSent == 0)
- return;
+ //This is unlocked in the callback
+ lock.writeLock().lock();
+ // we do not have a working bulk request, we can just exit here.
+ if (this.bulkRequest == null || batchItemsSent == 0)
+ return;
- // call the flush command.
- flush(this.bulkRequest, batchItemsSent, batchSizeInBytes);
+ // call the flush command.
+ flush(this.bulkRequest, batchItemsSent, batchSizeInBytes);
- // null the flush request, this will be created in the 'add' function below
- this.bulkRequest = null;
+ // null the flush request, this will be created in the 'add' function below
+ this.bulkRequest = null;
- // record the proper statistics, and add it to our totals.
- this.totalSizeInBytes += this.batchSizeInBytes;
- this.totalSent += batchItemsSent;
+ // record the proper statistics, and add it to our totals.
+ this.totalSizeInBytes += this.batchSizeInBytes;
+ this.totalSent += batchItemsSent;
- // reset the current batch statistics
- this.batchSizeInBytes = 0;
- this.batchItemsSent = 0;
+ // reset the current batch statistics
+ this.batchSizeInBytes = 0;
+ this.batchItemsSent = 0;
- try {
- int count = 0;
- if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) {
- /****************************************************************************
- * Author:
- * Smashew
- *
- * Date:
- * 2013-10-20
- *
- * Note:
- * With the information that we have on hand. We need to develop a heuristic
- * that will determine when the cluster is having a problem indexing records
- * by telling it to pause and wait for it to catch back up. A
- *
- * There is an impact to us, the caller, whenever this happens as well. Items
- * that are not yet fully indexed by the server sit in a queue, on the client
- * that can cause the heap to overflow. This has been seen when re-indexing
- * large amounts of data to a small cluster. The "deletes" + "indexes" can
- * cause the server to have many 'outstandingItems" in queue. Running this
- * software with large amounts of data, on a small cluster, can re-create
- * this problem.
- *
- * DO NOT DELETE THESE LINES
- ****************************************************************************/
-
- // wait for the flush to catch up. We are going to cap this at
- while (this.getTotalOutstanding() > WAITING_DOCS_LIMIT && count++ < 500)
- Thread.sleep(10);
-
- if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT)
- LOGGER.warn("Even after back-off there are {} items still in queue.", this.getTotalOutstanding());
- }
- } catch (Exception e) {
- LOGGER.info("We were broken from our loop: {}", e.getMessage());
+ try {
+ int count = 0;
+ if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) {
+ /****************************************************************************
+ * Author:
+ * Smashew
+ *
+ * Date:
+ * 2013-10-20
+ *
+ * Note:
+ * With the information that we have on hand. We need to develop a heuristic
+ * that will determine when the cluster is having a problem indexing records
+ * by telling it to pause and wait for it to catch back up. A
+ *
+ * There is an impact to us, the caller, whenever this happens as well. Items
+ * that are not yet fully indexed by the server sit in a queue, on the client
+ * that can cause the heap to overflow. This has been seen when re-indexing
+ * large amounts of data to a small cluster. The "deletes" + "indexes" can
+ * cause the server to have many 'outstandingItems" in queue. Running this
+ * software with large amounts of data, on a small cluster, can re-create
+ * this problem.
+ *
+ * DO NOT DELETE THESE LINES
+ ****************************************************************************/
+
+ // wait for the flush to catch up. We are going to cap this at
+ while (this.getTotalOutstanding() > WAITING_DOCS_LIMIT && count++ < 500)
+ Thread.sleep(10);
+
+ if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT)
+ LOGGER.warn("Even after back-off there are {} items still in queue.", this.getTotalOutstanding());
}
+ } catch (Exception e) {
+ LOGGER.info("We were broken from our loop: {}", e.getMessage());
}
+
}
public void add(String indexName, String type, String json) {
@@ -353,32 +376,35 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
public void add(UpdateRequest updateRequest) {
Preconditions.checkNotNull(updateRequest);
- synchronized (this) {
- checkAndCreateBulkRequest();
- checkIndexImplications(updateRequest.index());
- bulkRequest.add(updateRequest);
- try {
- Optional<Integer> size = Objects.firstNonNull(
- Optional.fromNullable(updateRequest.doc().source().length()),
- Optional.fromNullable(updateRequest.script().length()));
- trackItemAndBytesWritten(size.get().longValue());
- } catch (NullPointerException x) {
- trackItemAndBytesWritten(1000);
- }
+ lock.writeLock().lock();
+ checkAndCreateBulkRequest();
+ checkIndexImplications(updateRequest.index());
+ bulkRequest.add(updateRequest);
+ try {
+ Optional<Integer> size = Objects.firstNonNull(
+ Optional.fromNullable(updateRequest.doc().source().length()),
+ Optional.fromNullable(updateRequest.script().length()));
+ trackItemAndBytesWritten(size.get().longValue());
+ } catch (NullPointerException x) {
+ trackItemAndBytesWritten(1000);
+ } finally {
+ lock.writeLock().unlock();
}
}
public void add(IndexRequest indexRequest) {
- synchronized (this) {
- checkAndCreateBulkRequest();
- checkIndexImplications(indexRequest.index());
- bulkRequest.add(indexRequest);
- try {
- trackItemAndBytesWritten(indexRequest.source().length());
- } catch (NullPointerException x) {
- LOGGER.warn("NPE adding/sizing indexrequest");
- }
+ lock.writeLock().lock();
+ checkAndCreateBulkRequest();
+ checkIndexImplications(indexRequest.index());
+ bulkRequest.add(indexRequest);
+ try {
+ trackItemAndBytesWritten(indexRequest.source().length());
+ } catch (NullPointerException x) {
+ LOGGER.warn("NPE adding/sizing indexrequest");
+ } finally {
+ lock.writeLock().unlock();
}
+
}
public void createIndexIfMissing(String indexName) {
@@ -447,6 +473,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
}
private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final Long thisSizeInBytes) {
+ LOGGER.debug("Attempting to write {} items to ES", thisSent);
bulkRequest.execute().addListener(new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
@@ -473,6 +500,9 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
if (thisSent != (thisOk + thisFailed))
LOGGER.error("We sent more items than this");
+ lastWrite.set(System.currentTimeMillis());
+ lock.writeLock().unlock();
+
LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]",
MEGABYTE_FORMAT.format((double) thisSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(thisOk), NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis),
MEGABYTE_FORMAT.format((double) totalSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding()));
@@ -480,12 +510,11 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
@Override
public void onFailure(Throwable e) {
+ lock.writeLock().unlock();
LOGGER.error("Error bulk loading: {}", e.getMessage());
e.printStackTrace();
}
});
-
- this.notify();
}
private void trackItemAndBytesWritten(long sizeInBytes) {
@@ -499,9 +528,12 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
private void checkAndCreateBulkRequest() {
// Synchronize to ensure that we don't lose any records
- synchronized (this) {
+ lock.writeLock().lock();
+ try {
if (bulkRequest == null)
bulkRequest = this.manager.getClient().prepareBulk();
+ } finally {
+ lock.writeLock().unlock();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c837a2cf/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index 21aad5c..c56aa82 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -13,6 +13,10 @@
"type": {
"type": "string",
"description": "Type to write as"
- }
+ },
+ "MaxTimeBetweenFlushMs": {
+ "type": "String",
+ "format": "utc-millisec"
+ }
}
}
\ No newline at end of file
[2/4] git commit: Updated termination process
Posted by mf...@apache.org.
Updated termination process
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6934c7a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6934c7a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6934c7a0
Branch: refs/heads/STREAMS-72
Commit: 6934c7a0d4c479392c6c147d00af8cd9ed7b3c44
Parents: c837a2c
Author: mfranklin <mf...@apache.org>
Authored: Thu May 1 18:45:30 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu May 1 18:45:30 2014 -0400
----------------------------------------------------------------------
.../local/builders/LocalStreamBuilder.java | 161 ++++++++++++-------
.../local/tasks/StreamsPersistWriterTask.java | 6 +-
.../local/tasks/StreamsProcessorTask.java | 2 +-
3 files changed, 110 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6934c7a0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 8e688ba..d313b3f 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -1,5 +1,6 @@
package org.apache.streams.local.builders;
+import org.apache.log4j.spi.LoggerFactory;
import org.apache.streams.core.*;
import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread;
import org.apache.streams.local.tasks.StatusCounterMonitorThread;
@@ -7,6 +8,7 @@ import org.apache.streams.local.tasks.StreamsProviderTask;
import org.apache.streams.local.tasks.StreamsTask;
import org.apache.streams.util.SerializationUtil;
import org.joda.time.DateTime;
+import org.slf4j.Logger;
import java.math.BigInteger;
import java.util.*;
@@ -22,6 +24,8 @@ import java.util.concurrent.TimeUnit;
*/
public class LocalStreamBuilder implements StreamBuilder {
+ private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(LocalStreamBuilder.class);
+
public static final String TIMEOUT_KEY = "TIMEOUT";
private Map<String, StreamComponent> providers;
private Map<String, StreamComponent> components;
@@ -32,6 +36,7 @@ public class LocalStreamBuilder implements StreamBuilder {
private int totalTasks;
private int monitorTasks;
private LocalStreamProcessMonitorThread monitorThread;
+ private Map<String, List<StreamsTask>> tasks;
/**
*
@@ -139,39 +144,18 @@ public class LocalStreamBuilder implements StreamBuilder {
*/
@Override
public void start() {
+ attachShutdownHandler();
boolean isRunning = true;
this.executor = Executors.newFixedThreadPool(this.totalTasks);
this.monitor = Executors.newFixedThreadPool(this.monitorTasks+1);
Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>();
- Map<String, List<StreamsTask>> streamsTasks = new HashMap<String, List<StreamsTask>>();
+ tasks = new HashMap<String, List<StreamsTask>>();
try {
monitorThread = new LocalStreamProcessMonitorThread(executor, 10);
this.monitor.submit(monitorThread);
- for(StreamComponent comp : this.components.values()) {
- int tasks = comp.getNumTasks();
- List<StreamsTask> compTasks = new LinkedList<StreamsTask>();
- for(int i=0; i < tasks; ++i) {
- StreamsTask task = comp.createConnectedTask(getTimeout());
- task.setStreamConfig(this.streamConfig);
- this.executor.submit(task);
- compTasks.add(task);
- if( comp.isOperationCountable() ) {
- this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10));
- this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10));
- }
- }
- streamsTasks.put(comp.getId(), compTasks);
- }
- for(StreamComponent prov : this.providers.values()) {
- StreamsTask task = prov.createConnectedTask(getTimeout());
- task.setStreamConfig(this.streamConfig);
- this.executor.submit(task);
- provTasks.put(prov.getId(), (StreamsProviderTask) task);
- if( prov.isOperationCountable() ) {
- this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) prov.getOperation(), 10));
- this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10));
- }
- }
+ setupComponentTasks(tasks);
+ setupProviderTasks(provTasks);
+ LOGGER.info("Started stream with {} components", tasks.size());
while(isRunning) {
isRunning = false;
for(StreamsProviderTask task : provTasks.values()) {
@@ -184,44 +168,98 @@ public class LocalStreamBuilder implements StreamBuilder {
Thread.sleep(3000);
}
}
- monitorThread.shutdown();
- this.executor.shutdown();
- //complete stream shut down gracfully
- for(StreamComponent prov : this.providers.values()) {
- shutDownTask(prov, streamsTasks);
+ LOGGER.debug("Components are no longer running or timed out due to completion");
+ shutdown(tasks);
+ } catch (InterruptedException e){
+ forceShutdown(tasks);
+ }
+
+ }
+
+ private void attachShutdownHandler() {
+ final LocalStreamBuilder self = this;
+ LOGGER.debug("Attaching shutdown handler");
+ Runtime.getRuntime().addShutdownHook(new Thread(){
+ @Override
+ public void run() {
+ LOGGER.debug("Shutdown hook received. Beginning shutdown");
+ self.stop();
+ }
+ });
+ }
+
+ protected void forceShutdown(Map<String, List<StreamsTask>> streamsTasks) {
+ LOGGER.debug("Shutdown failed. Forcing shutdown");
+ //give the stream 30secs to try to shutdown gracefully, then force shutdown otherwise
+ for(List<StreamsTask> tasks : streamsTasks.values()) {
+ for(StreamsTask task : tasks) {
+ task.stopTask();
}
- //need to make this configurable
- if(!this.executor.awaitTermination(10, TimeUnit.SECONDS)) { // all threads should have terminated already.
+ }
+ this.executor.shutdown();
+ this.monitor.shutdown();
+ try {
+ if(!this.executor.awaitTermination(3, TimeUnit.SECONDS)){
this.executor.shutdownNow();
- this.executor.awaitTermination(10, TimeUnit.SECONDS);
}
- if(!this.monitor.awaitTermination(5, TimeUnit.SECONDS)) { // all threads should have terminated already.
+ if(!this.monitor.awaitTermination(3, TimeUnit.SECONDS)){
this.monitor.shutdownNow();
- this.monitor.awaitTermination(5, TimeUnit.SECONDS);
}
- } catch (InterruptedException e){
- //give the stream 30secs to try to shutdown gracefully, then force shutdown otherwise
- for(List<StreamsTask> tasks : streamsTasks.values()) {
- for(StreamsTask task : tasks) {
- task.stopTask();
- }
+ }catch (InterruptedException ie) {
+ this.executor.shutdownNow();
+ this.monitor.shutdownNow();
+ throw new RuntimeException(ie);
+ }
+ }
+
+ protected void shutdown(Map<String, List<StreamsTask>> streamsTasks) throws InterruptedException {
+ LOGGER.info("Attempting to shutdown tasks");
+ monitorThread.shutdown();
+ this.executor.shutdown();
+ //complete stream shut down gracfully
+ for(StreamComponent prov : this.providers.values()) {
+ shutDownTask(prov, streamsTasks);
+ }
+ //need to make this configurable
+ if(!this.executor.awaitTermination(10, TimeUnit.SECONDS)) { // all threads should have terminated already.
+ this.executor.shutdownNow();
+ this.executor.awaitTermination(10, TimeUnit.SECONDS);
+ }
+ if(!this.monitor.awaitTermination(5, TimeUnit.SECONDS)) { // all threads should have terminated already.
+ this.monitor.shutdownNow();
+ this.monitor.awaitTermination(5, TimeUnit.SECONDS);
+ }
+ }
+
+ protected void setupProviderTasks(Map<String, StreamsProviderTask> provTasks) {
+ for(StreamComponent prov : this.providers.values()) {
+ StreamsTask task = prov.createConnectedTask(getTimeout());
+ task.setStreamConfig(this.streamConfig);
+ this.executor.submit(task);
+ provTasks.put(prov.getId(), (StreamsProviderTask) task);
+ if( prov.isOperationCountable() ) {
+ this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) prov.getOperation(), 10));
+ this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10));
}
- this.executor.shutdown();
- this.monitor.shutdown();
- try {
- if(!this.executor.awaitTermination(3, TimeUnit.SECONDS)){
- this.executor.shutdownNow();
- }
- if(!this.monitor.awaitTermination(3, TimeUnit.SECONDS)){
- this.monitor.shutdownNow();
+ }
+ }
+
+ protected void setupComponentTasks(Map<String, List<StreamsTask>> streamsTasks) {
+ for(StreamComponent comp : this.components.values()) {
+ int tasks = comp.getNumTasks();
+ List<StreamsTask> compTasks = new LinkedList<StreamsTask>();
+ for(int i=0; i < tasks; ++i) {
+ StreamsTask task = comp.createConnectedTask(getTimeout());
+ task.setStreamConfig(this.streamConfig);
+ this.executor.submit(task);
+ compTasks.add(task);
+ if( comp.isOperationCountable() ) {
+ this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10));
+ this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10));
}
- }catch (InterruptedException ie) {
- this.executor.shutdownNow();
- this.monitor.shutdownNow();
- throw new RuntimeException(ie);
}
+ streamsTasks.put(comp.getId(), compTasks);
}
-
}
/**
@@ -249,8 +287,13 @@ public class LocalStreamBuilder implements StreamBuilder {
task.stopTask();
}
for(StreamsTask task : tasks) {
- while(task.isRunning()) {
+ int count = 0;
+ while(count < 20 && task.isRunning()) {
Thread.sleep(500);
+ count++;
+ }
+ if(task.isRunning()) {
+ LOGGER.warn("Task {} failed to terminate in allotted timeframe", task.toString());
}
}
}
@@ -268,7 +311,11 @@ public class LocalStreamBuilder implements StreamBuilder {
*/
@Override
public void stop() {
-
+ try {
+ shutdown(tasks);
+ } catch (Exception e) {
+ forceShutdown(tasks);
+ }
}
private void connectToOtherComponents(String[] conntectToIds, StreamComponent toBeConnected) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6934c7a0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
index 1eac1d9..8146bdd 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
@@ -72,12 +72,13 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
try {
this.writer.prepare(this.streamConfig);
StreamsDatum datum = this.inQueue.poll();
- while(datum != null || this.keepRunning.get()) {
+ while(this.keepRunning.get()) {
if(datum != null) {
try {
this.writer.write(datum);
statusCounter.incrementStatus(DatumStatus.SUCCESS);
} catch (Exception e) {
+ LOGGER.error("Error writing to persist writer {}", this.writer.getClass().getSimpleName(), e);
this.keepRunning.set(false);
statusCounter.incrementStatus(DatumStatus.FAIL);
}
@@ -86,12 +87,15 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
try {
Thread.sleep(this.sleepTime);
} catch (InterruptedException e) {
+ LOGGER.warn("Thread interrupted in Writer task for {}",this.writer.getClass().getSimpleName(), e);
this.keepRunning.set(false);
}
}
datum = this.inQueue.poll();
}
+ } catch(Exception e) {
+ LOGGER.error("Failed to execute Persist Writer {}",this.writer.getClass().getSimpleName(), e);
} finally {
this.writer.cleanUp();
this.isRunning.set(false);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6934c7a0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
index d1ac905..d4c7a16 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
@@ -67,7 +67,7 @@ public class StreamsProcessorTask extends BaseStreamsTask {
try {
this.processor.prepare(this.streamConfig);
StreamsDatum datum = this.inQueue.poll();
- while(datum != null || this.keepRunning.get()) {
+ while(this.keepRunning.get()) {
if(datum != null) {
List<StreamsDatum> output = this.processor.process(datum);
if(output != null) {
[3/4] git commit: Fixed deadlock issue
Posted by mf...@apache.org.
Fixed deadlock issue
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/fb8f7aea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/fb8f7aea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/fb8f7aea
Branch: refs/heads/STREAMS-72
Commit: fb8f7aea3032296a4c0dd779aca976104de44d61
Parents: 6934c7a
Author: mfranklin <mf...@apache.org>
Authored: Thu May 1 19:53:18 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu May 1 19:53:18 2014 -0400
----------------------------------------------------------------------
.../ElasticsearchPersistWriter.java | 20 +++++++++++++-------
1 file changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f7aea/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index c574eeb..5756e1c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -54,7 +54,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
private static final long WAITING_DOCS_LIMIT = 10000;
private static final int BYTES_IN_MB = 1024 * 1024;
private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
- private static final long MAX_MS_BEFORE_FLUSH = 10000;
+ private static final long DEFAULT_MAX_WAIT = 10000;
private final List<String> affectedIndexes = new ArrayList<String>();
private final ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
@@ -251,7 +251,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
@Override
public void prepare(Object configurationObject) {
- maxMsBeforeFlush = config.getMaxTimeBetweenFlushMs() == null ? MAX_MS_BEFORE_FLUSH : config.getMaxTimeBetweenFlushMs();
+ maxMsBeforeFlush = config.getMaxTimeBetweenFlushMs() == null ? DEFAULT_MAX_WAIT : config.getMaxTimeBetweenFlushMs();
mapper = StreamsJacksonMapper.getInstance();
start();
}
@@ -284,7 +284,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
}
public void flushInternal() {
- //This is unlocked in the callback
lock.writeLock().lock();
// we do not have a working bulk request, we can just exit here.
if (this.bulkRequest == null || batchItemsSent == 0)
@@ -339,6 +338,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
}
} catch (Exception e) {
LOGGER.info("We were broken from our loop: {}", e.getMessage());
+ } finally {
+ lock.writeLock().unlock();
}
}
@@ -472,11 +473,20 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
return toReturn;
}
+ /**
+ * This method is to ONLY be called by flushInternal otherwise the counts will be off.
+ * @param bulkRequest
+ * @param thisSent
+ * @param thisSizeInBytes
+ */
private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final Long thisSizeInBytes) {
+ final Object messenger = new Object();
LOGGER.debug("Attempting to write {} items to ES", thisSent);
bulkRequest.execute().addListener(new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
+ lastWrite.set(System.currentTimeMillis());
+
if (bulkItemResponses.hasFailures())
LOGGER.warn("Bulk Uploading had totalFailed: " + bulkItemResponses.buildFailureMessage());
@@ -500,9 +510,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
if (thisSent != (thisOk + thisFailed))
LOGGER.error("We sent more items than this");
- lastWrite.set(System.currentTimeMillis());
- lock.writeLock().unlock();
-
LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]",
MEGABYTE_FORMAT.format((double) thisSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(thisOk), NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis),
MEGABYTE_FORMAT.format((double) totalSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding()));
@@ -510,7 +517,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
@Override
public void onFailure(Throwable e) {
- lock.writeLock().unlock();
LOGGER.error("Error bulk loading: {}", e.getMessage());
e.printStackTrace();
}
[4/4] git commit: Fixed issue where exception terminated thread and
it wouldn't start again
Posted by mf...@apache.org.
Fixed issue where exception terminated thread and it wouldn't start again
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f62afa57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f62afa57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f62afa57
Branch: refs/heads/STREAMS-72
Commit: f62afa57ce5ca19d639c84e89ea29f998a16fbe5
Parents: fb8f7ae
Author: mfranklin <mf...@apache.org>
Authored: Thu May 1 19:54:56 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu May 1 19:54:56 2014 -0400
----------------------------------------------------------------------
.../provider/SysomosHeartbeatStream.java | 45 +++++++++++---------
1 file changed, 26 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f62afa57/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
index c5145fb..9cd5898 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
@@ -72,29 +72,36 @@ public class SysomosHeartbeatStream implements Runnable {
}
protected QueryResult executeAPIRequest() {
- BeatApi.BeatResponse response = this.client.createRequestBuilder()
- .setHeartBeatId(heartbeatId)
- .setOffset(0)
- .setReturnSetSize(maxApiBatch).execute();
-
- LOGGER.debug("Received {} results from API query", response.getCount());
+ BeatApi.BeatResponse response = null;
+ try {
+ response = this.client.createRequestBuilder()
+ .setHeartBeatId(heartbeatId)
+ .setOffset(0)
+ .setReturnSetSize(maxApiBatch).execute();
+
+ LOGGER.debug("Received {} results from API query", response.getCount());
+ } catch (Exception e) {
+ LOGGER.warn("Error querying Sysomos API", e);
+ }
String currentId = null;
boolean matched = false;
- for(BeatApi.BeatResponse.Beat beat : response.getBeat()) {
- String docId = beat.getDocid();
- //We get documents in descending time order. This will set the id to the latest document
- if(currentId == null) {
- currentId = docId;
- }
- //We only want to process documents that we know we have not seen before
- if(lastID != null && lastID.equals(docId)) {
- matched = true;
- break;
+ if(response != null) {
+ for (BeatApi.BeatResponse.Beat beat : response.getBeat()) {
+ String docId = beat.getDocid();
+ //We get documents in descending time order. This will set the id to the latest document
+ if (currentId == null) {
+ currentId = docId;
+ }
+ //We only want to process documents that we know we have not seen before
+ if (lastID != null && lastID.equals(docId)) {
+ matched = true;
+ break;
+ }
+ StreamsDatum item = new StreamsDatum(beat, docId);
+ item.getMetadata().put("heartbeat", this.heartbeatId);
+ this.provider.enqueueItem(item);
}
- StreamsDatum item = new StreamsDatum(beat, docId);
- item.getMetadata().put("heartbeat", this.heartbeatId);
- this.provider.enqueueItem(item);
}
return new QueryResult(matched, currentId);
}