You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/05/02 14:59:16 UTC

[1/4] git commit: Updated Persist writer to have a configurable background flush thread

Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-72 [created] f62afa57c


Updated Persist writer to have a configurable background flush thread


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c837a2cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c837a2cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c837a2cf

Branch: refs/heads/STREAMS-72
Commit: c837a2cf8800efd324003ba5a9299c16b9f8701a
Parents: e6ffe29
Author: mfranklin <mf...@apache.org>
Authored: Thu May 1 13:59:32 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu May 1 13:59:32 2014 -0400

----------------------------------------------------------------------
 .../ElasticsearchConfigurator.java              |   4 +-
 .../ElasticsearchPersistWriter.java             | 184 +++++++++++--------
 .../ElasticsearchWriterConfiguration.json       |   6 +-
 3 files changed, 116 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c837a2cf/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
index 483a177..8a343a6 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
@@ -8,7 +8,7 @@ import org.slf4j.LoggerFactory;
 import java.util.List;
 
 /**
- * Created by sblackmon on 12/10/13.
+ * Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration
  */
 public class ElasticsearchConfigurator {
 
@@ -51,9 +51,11 @@ public class ElasticsearchConfigurator {
 
         String index = elasticsearch.getString("index");
         String type = elasticsearch.getString("type");
+        Long maxMsBeforeFlush = elasticsearch.hasPath("MaxTimeBetweenFlushMs") ? elasticsearch.getLong("MaxTimeBetweenFlushMs") : null;
 
         elasticsearchWriterConfiguration.setIndex(index);
         elasticsearchWriterConfiguration.setType(type);
+        elasticsearchWriterConfiguration.setMaxTimeBetweenFlushMs(maxMsBeforeFlush);
 
         return elasticsearchWriterConfiguration;
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c837a2cf/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index ff7e0f5..c574eeb 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -36,10 +36,15 @@ import java.io.OutputStreamWriter;
 import java.text.DecimalFormat;
 import java.text.NumberFormat;
 import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable {
     public static final String STREAMS_ID = "ElasticsearchPersistWriter";
-
     public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
@@ -49,8 +54,11 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
     private static final long WAITING_DOCS_LIMIT = 10000;
     private static final int BYTES_IN_MB = 1024 * 1024;
     private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
+    private static final long MAX_MS_BEFORE_FLUSH = 10000;
 
     private final List<String> affectedIndexes = new ArrayList<String>();
+    private final ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
     private ObjectMapper mapper = new StreamsJacksonMapper();
     private ElasticsearchClientManager manager;
@@ -61,6 +69,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
     private OutputStreamWriter currentWriter = null;
     private int batchSize = 50;
     private int totalRecordsWritten = 0;
+    private long maxMsBeforeFlush;
     private boolean veryLargeBulk = false;  // by default this setting is set to false
 
     protected Thread task;
@@ -78,10 +87,11 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
     private volatile int batchItemsSent = 0;
     private volatile int totalByteCount = 0;
     private volatile int byteCount = 0;
+    private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
 
     public ElasticsearchPersistWriter() {
         Config config = StreamsConfigurator.config.getConfig("elasticsearch");
-        this.config = (ElasticsearchWriterConfiguration) ElasticsearchConfigurator.detectConfiguration(config);
+        this.config = ElasticsearchConfigurator.detectWriterConfiguration(config);
     }
 
     public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
@@ -173,6 +183,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
 
         try {
             flush();
+            backgroundFlushTask.shutdownNow();
         } catch (IOException e) {
             e.printStackTrace();
         }
@@ -240,6 +251,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
 
     @Override
     public void prepare(Object configurationObject) {
+        maxMsBeforeFlush = config.getMaxTimeBetweenFlushMs() == null ? MAX_MS_BEFORE_FLUSH : config.getMaxTimeBetweenFlushMs();
         mapper = StreamsJacksonMapper.getInstance();
         start();
     }
@@ -254,7 +266,17 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
     }
 
     public void start() {
-
+        backgroundFlushTask.scheduleWithFixedDelay(new Runnable() {
+            @Override
+            public void run() {
+                LOGGER.debug("Checking to see if data needs to be flushed");
+                long time = System.currentTimeMillis() - lastWrite.get();
+                if (time > maxMsBeforeFlush && batchItemsSent > 0) {
+                    LOGGER.debug("Background Flush task determined {} are waiting to be flushed.  It has been {} since the last write to ES", batchItemsSent, time);
+                    flushInternal();
+                }
+            }
+        }, 0, maxMsBeforeFlush * 2, TimeUnit.MILLISECONDS);
         manager = new ElasticsearchClientManager(config);
         client = manager.getClient();
 
@@ -262,62 +284,63 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
     }
 
     public void flushInternal() {
-        synchronized (this) {
-            // we do not have a working bulk request, we can just exit here.
-            if (this.bulkRequest == null || batchItemsSent == 0)
-                return;
+        //This is unlocked in the callback
+        lock.writeLock().lock();
+        // we do not have a working bulk request, we can just exit here.
+        if (this.bulkRequest == null || batchItemsSent == 0)
+            return;
 
-            // call the flush command.
-            flush(this.bulkRequest, batchItemsSent, batchSizeInBytes);
+        // call the flush command.
+        flush(this.bulkRequest, batchItemsSent, batchSizeInBytes);
 
-            // null the flush request, this will be created in the 'add' function below
-            this.bulkRequest = null;
+        // null the flush request, this will be created in the 'add' function below
+        this.bulkRequest = null;
 
-            // record the proper statistics, and add it to our totals.
-            this.totalSizeInBytes += this.batchSizeInBytes;
-            this.totalSent += batchItemsSent;
+        // record the proper statistics, and add it to our totals.
+        this.totalSizeInBytes += this.batchSizeInBytes;
+        this.totalSent += batchItemsSent;
 
-            // reset the current batch statistics
-            this.batchSizeInBytes = 0;
-            this.batchItemsSent = 0;
+        // reset the current batch statistics
+        this.batchSizeInBytes = 0;
+        this.batchItemsSent = 0;
 
-            try {
-                int count = 0;
-                if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) {
-                    /****************************************************************************
-                     * Author:
-                     * Smashew
-                     *
-                     * Date:
-                     * 2013-10-20
-                     *
-                     * Note:
-                     * With the information that we have on hand. We need to develop a heuristic
-                     * that will determine when the cluster is having a problem indexing records
-                     * by telling it to pause and wait for it to catch back up. A
-                     *
-                     * There is an impact to us, the caller, whenever this happens as well. Items
-                     * that are not yet fully indexed by the server sit in a queue, on the client
-                     * that can cause the heap to overflow. This has been seen when re-indexing
-                     * large amounts of data to a small cluster. The "deletes" + "indexes" can
-                     * cause the server to have many 'outstandingItems" in queue. Running this
-                     * software with large amounts of data, on a small cluster, can re-create
-                     * this problem.
-                     *
-                     * DO NOT DELETE THESE LINES
-                     ****************************************************************************/
-
-                    // wait for the flush to catch up. We are going to cap this at
-                    while (this.getTotalOutstanding() > WAITING_DOCS_LIMIT && count++ < 500)
-                        Thread.sleep(10);
-
-                    if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT)
-                        LOGGER.warn("Even after back-off there are {} items still in queue.", this.getTotalOutstanding());
-                }
-            } catch (Exception e) {
-                LOGGER.info("We were broken from our loop: {}", e.getMessage());
+        try {
+            int count = 0;
+            if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) {
+                /****************************************************************************
+                 * Author:
+                 * Smashew
+                 *
+                 * Date:
+                 * 2013-10-20
+                 *
+                 * Note:
+                 * With the information that we have on hand. We need to develop a heuristic
+                 * that will determine when the cluster is having a problem indexing records
+                 * by telling it to pause and wait for it to catch back up. A
+                 *
+                 * There is an impact to us, the caller, whenever this happens as well. Items
+                 * that are not yet fully indexed by the server sit in a queue, on the client
+                 * that can cause the heap to overflow. This has been seen when re-indexing
+                 * large amounts of data to a small cluster. The "deletes" + "indexes" can
+                 * cause the server to have many 'outstandingItems" in queue. Running this
+                 * software with large amounts of data, on a small cluster, can re-create
+                 * this problem.
+                 *
+                 * DO NOT DELETE THESE LINES
+                 ****************************************************************************/
+
+                // wait for the flush to catch up. We are going to cap this at
+                while (this.getTotalOutstanding() > WAITING_DOCS_LIMIT && count++ < 500)
+                    Thread.sleep(10);
+
+                if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT)
+                    LOGGER.warn("Even after back-off there are {} items still in queue.", this.getTotalOutstanding());
             }
+        } catch (Exception e) {
+            LOGGER.info("We were broken from our loop: {}", e.getMessage());
         }
+
     }
 
     public void add(String indexName, String type, String json) {
@@ -353,32 +376,35 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
 
     public void add(UpdateRequest updateRequest) {
         Preconditions.checkNotNull(updateRequest);
-        synchronized (this) {
-            checkAndCreateBulkRequest();
-            checkIndexImplications(updateRequest.index());
-            bulkRequest.add(updateRequest);
-            try {
-                Optional<Integer> size = Objects.firstNonNull(
-                        Optional.fromNullable(updateRequest.doc().source().length()),
-                        Optional.fromNullable(updateRequest.script().length()));
-                trackItemAndBytesWritten(size.get().longValue());
-            } catch (NullPointerException x) {
-                trackItemAndBytesWritten(1000);
-            }
+        lock.writeLock().lock();
+        checkAndCreateBulkRequest();
+        checkIndexImplications(updateRequest.index());
+        bulkRequest.add(updateRequest);
+        try {
+            Optional<Integer> size = Objects.firstNonNull(
+                    Optional.fromNullable(updateRequest.doc().source().length()),
+                    Optional.fromNullable(updateRequest.script().length()));
+            trackItemAndBytesWritten(size.get().longValue());
+        } catch (NullPointerException x) {
+            trackItemAndBytesWritten(1000);
+        } finally {
+            lock.writeLock().unlock();
         }
     }
 
     public void add(IndexRequest indexRequest) {
-        synchronized (this) {
-            checkAndCreateBulkRequest();
-            checkIndexImplications(indexRequest.index());
-            bulkRequest.add(indexRequest);
-            try {
-                trackItemAndBytesWritten(indexRequest.source().length());
-            } catch (NullPointerException x) {
-                LOGGER.warn("NPE adding/sizing indexrequest");
-            }
+        lock.writeLock().lock();
+        checkAndCreateBulkRequest();
+        checkIndexImplications(indexRequest.index());
+        bulkRequest.add(indexRequest);
+        try {
+            trackItemAndBytesWritten(indexRequest.source().length());
+        } catch (NullPointerException x) {
+            LOGGER.warn("NPE adding/sizing indexrequest");
+        } finally {
+            lock.writeLock().unlock();
         }
+
     }
 
     public void createIndexIfMissing(String indexName) {
@@ -447,6 +473,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
     }
 
     private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final Long thisSizeInBytes) {
+        LOGGER.debug("Attempting to write {} items to ES", thisSent);
         bulkRequest.execute().addListener(new ActionListener<BulkResponse>() {
             @Override
             public void onResponse(BulkResponse bulkItemResponses) {
@@ -473,6 +500,9 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
                 if (thisSent != (thisOk + thisFailed))
                     LOGGER.error("We sent more items than this");
 
+                lastWrite.set(System.currentTimeMillis());
+                lock.writeLock().unlock();
+
                 LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]",
                         MEGABYTE_FORMAT.format((double) thisSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(thisOk), NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis),
                         MEGABYTE_FORMAT.format((double) totalSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding()));
@@ -480,12 +510,11 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
 
             @Override
             public void onFailure(Throwable e) {
+                lock.writeLock().unlock();
                 LOGGER.error("Error bulk loading: {}", e.getMessage());
                 e.printStackTrace();
             }
         });
-
-        this.notify();
     }
 
     private void trackItemAndBytesWritten(long sizeInBytes) {
@@ -499,9 +528,12 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
 
     private void checkAndCreateBulkRequest() {
         // Synchronize to ensure that we don't lose any records
-        synchronized (this) {
+        lock.writeLock().lock();
+        try {
             if (bulkRequest == null)
                 bulkRequest = this.manager.getClient().prepareBulk();
+        } finally {
+            lock.writeLock().unlock();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c837a2cf/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index 21aad5c..c56aa82 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -13,6 +13,10 @@
         "type": {
             "type": "string",
             "description": "Type to write as"
-        }
+        },
+		"MaxTimeBetweenFlushMs": {
+			"type": "String",
+			"format": "utc-millisec"
+		}
     }
 }
\ No newline at end of file


[2/4] git commit: Updated termination process

Posted by mf...@apache.org.
Updated termination process


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6934c7a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6934c7a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6934c7a0

Branch: refs/heads/STREAMS-72
Commit: 6934c7a0d4c479392c6c147d00af8cd9ed7b3c44
Parents: c837a2c
Author: mfranklin <mf...@apache.org>
Authored: Thu May 1 18:45:30 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu May 1 18:45:30 2014 -0400

----------------------------------------------------------------------
 .../local/builders/LocalStreamBuilder.java      | 161 ++++++++++++-------
 .../local/tasks/StreamsPersistWriterTask.java   |   6 +-
 .../local/tasks/StreamsProcessorTask.java       |   2 +-
 3 files changed, 110 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6934c7a0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 8e688ba..d313b3f 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -1,5 +1,6 @@
 package org.apache.streams.local.builders;
 
+import org.apache.log4j.spi.LoggerFactory;
 import org.apache.streams.core.*;
 import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread;
 import org.apache.streams.local.tasks.StatusCounterMonitorThread;
@@ -7,6 +8,7 @@ import org.apache.streams.local.tasks.StreamsProviderTask;
 import org.apache.streams.local.tasks.StreamsTask;
 import org.apache.streams.util.SerializationUtil;
 import org.joda.time.DateTime;
+import org.slf4j.Logger;
 
 import java.math.BigInteger;
 import java.util.*;
@@ -22,6 +24,8 @@ import java.util.concurrent.TimeUnit;
  */
 public class LocalStreamBuilder implements StreamBuilder {
 
+    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(LocalStreamBuilder.class);
+
     public static final String TIMEOUT_KEY = "TIMEOUT";
     private Map<String, StreamComponent> providers;
     private Map<String, StreamComponent> components;
@@ -32,6 +36,7 @@ public class LocalStreamBuilder implements StreamBuilder {
     private int totalTasks;
     private int monitorTasks;
     private LocalStreamProcessMonitorThread monitorThread;
+    private Map<String, List<StreamsTask>> tasks;
 
     /**
      *
@@ -139,39 +144,18 @@ public class LocalStreamBuilder implements StreamBuilder {
      */
     @Override
     public void start() {
+        attachShutdownHandler();
         boolean isRunning = true;
         this.executor = Executors.newFixedThreadPool(this.totalTasks);
         this.monitor = Executors.newFixedThreadPool(this.monitorTasks+1);
         Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>();
-        Map<String, List<StreamsTask>> streamsTasks = new HashMap<String, List<StreamsTask>>();
+        tasks = new HashMap<String, List<StreamsTask>>();
         try {
             monitorThread = new LocalStreamProcessMonitorThread(executor, 10);
             this.monitor.submit(monitorThread);
-            for(StreamComponent comp : this.components.values()) {
-                int tasks = comp.getNumTasks();
-                List<StreamsTask> compTasks = new LinkedList<StreamsTask>();
-                for(int i=0; i < tasks; ++i) {
-                    StreamsTask task = comp.createConnectedTask(getTimeout());
-                    task.setStreamConfig(this.streamConfig);
-                    this.executor.submit(task);
-                    compTasks.add(task);
-                    if( comp.isOperationCountable() ) {
-                        this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10));
-                        this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10));
-                    }
-                }
-                streamsTasks.put(comp.getId(), compTasks);
-            }
-            for(StreamComponent prov : this.providers.values()) {
-                StreamsTask task = prov.createConnectedTask(getTimeout());
-                task.setStreamConfig(this.streamConfig);
-                this.executor.submit(task);
-                provTasks.put(prov.getId(), (StreamsProviderTask) task);
-                if( prov.isOperationCountable() ) {
-                    this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) prov.getOperation(), 10));
-                    this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10));
-                }
-            }
+            setupComponentTasks(tasks);
+            setupProviderTasks(provTasks);
+            LOGGER.info("Started stream with {} components", tasks.size());
             while(isRunning) {
                 isRunning = false;
                 for(StreamsProviderTask task : provTasks.values()) {
@@ -184,44 +168,98 @@ public class LocalStreamBuilder implements StreamBuilder {
                     Thread.sleep(3000);
                 }
             }
-            monitorThread.shutdown();
-            this.executor.shutdown();
-            //complete stream shut down gracfully
-            for(StreamComponent prov : this.providers.values()) {
-                shutDownTask(prov, streamsTasks);
+            LOGGER.debug("Components are no longer running or timed out due to completion");
+            shutdown(tasks);
+        } catch (InterruptedException e){
+            forceShutdown(tasks);
+        }
+
+    }
+
+    private void attachShutdownHandler() {
+        final LocalStreamBuilder self = this;
+        LOGGER.debug("Attaching shutdown handler");
+        Runtime.getRuntime().addShutdownHook(new Thread(){
+            @Override
+            public void run() {
+                LOGGER.debug("Shutdown hook received.  Beginning shutdown");
+                self.stop();
+            }
+        });
+    }
+
+    protected void forceShutdown(Map<String, List<StreamsTask>> streamsTasks) {
+        LOGGER.debug("Shutdown failed.  Forcing shutdown");
+        //give the stream 30secs to try to shutdown gracefully, then force shutdown otherwise
+        for(List<StreamsTask> tasks : streamsTasks.values()) {
+            for(StreamsTask task : tasks) {
+                task.stopTask();
             }
-            //need to make this configurable
-            if(!this.executor.awaitTermination(10, TimeUnit.SECONDS)) { // all threads should have terminated already.
+        }
+        this.executor.shutdown();
+        this.monitor.shutdown();
+        try {
+            if(!this.executor.awaitTermination(3, TimeUnit.SECONDS)){
                 this.executor.shutdownNow();
-                this.executor.awaitTermination(10, TimeUnit.SECONDS);
             }
-            if(!this.monitor.awaitTermination(5, TimeUnit.SECONDS)) { // all threads should have terminated already.
+            if(!this.monitor.awaitTermination(3, TimeUnit.SECONDS)){
                 this.monitor.shutdownNow();
-                this.monitor.awaitTermination(5, TimeUnit.SECONDS);
             }
-        } catch (InterruptedException e){
-            //give the stream 30secs to try to shutdown gracefully, then force shutdown otherwise
-            for(List<StreamsTask> tasks : streamsTasks.values()) {
-                for(StreamsTask task : tasks) {
-                    task.stopTask();
-                }
+        }catch (InterruptedException ie) {
+            this.executor.shutdownNow();
+            this.monitor.shutdownNow();
+            throw new RuntimeException(ie);
+        }
+    }
+
+    protected void shutdown(Map<String, List<StreamsTask>> streamsTasks) throws InterruptedException {
+        LOGGER.info("Attempting to shutdown tasks");
+        monitorThread.shutdown();
+        this.executor.shutdown();
+        //complete stream shut down gracfully
+        for(StreamComponent prov : this.providers.values()) {
+            shutDownTask(prov, streamsTasks);
+        }
+        //need to make this configurable
+        if(!this.executor.awaitTermination(10, TimeUnit.SECONDS)) { // all threads should have terminated already.
+            this.executor.shutdownNow();
+            this.executor.awaitTermination(10, TimeUnit.SECONDS);
+        }
+        if(!this.monitor.awaitTermination(5, TimeUnit.SECONDS)) { // all threads should have terminated already.
+            this.monitor.shutdownNow();
+            this.monitor.awaitTermination(5, TimeUnit.SECONDS);
+        }
+    }
+
+    protected void setupProviderTasks(Map<String, StreamsProviderTask> provTasks) {
+        for(StreamComponent prov : this.providers.values()) {
+            StreamsTask task = prov.createConnectedTask(getTimeout());
+            task.setStreamConfig(this.streamConfig);
+            this.executor.submit(task);
+            provTasks.put(prov.getId(), (StreamsProviderTask) task);
+            if( prov.isOperationCountable() ) {
+                this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) prov.getOperation(), 10));
+                this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10));
             }
-            this.executor.shutdown();
-            this.monitor.shutdown();
-            try {
-                if(!this.executor.awaitTermination(3, TimeUnit.SECONDS)){
-                    this.executor.shutdownNow();
-                }
-                if(!this.monitor.awaitTermination(3, TimeUnit.SECONDS)){
-                    this.monitor.shutdownNow();
+        }
+    }
+
+    protected void setupComponentTasks(Map<String, List<StreamsTask>> streamsTasks) {
+        for(StreamComponent comp : this.components.values()) {
+            int tasks = comp.getNumTasks();
+            List<StreamsTask> compTasks = new LinkedList<StreamsTask>();
+            for(int i=0; i < tasks; ++i) {
+                StreamsTask task = comp.createConnectedTask(getTimeout());
+                task.setStreamConfig(this.streamConfig);
+                this.executor.submit(task);
+                compTasks.add(task);
+                if( comp.isOperationCountable() ) {
+                    this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) comp.getOperation(), 10));
+                    this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable) task, 10));
                 }
-            }catch (InterruptedException ie) {
-                this.executor.shutdownNow();
-                this.monitor.shutdownNow();
-                throw new RuntimeException(ie);
             }
+            streamsTasks.put(comp.getId(), compTasks);
         }
-
     }
 
     /**
@@ -249,8 +287,13 @@ public class LocalStreamBuilder implements StreamBuilder {
                     task.stopTask();
                 }
                 for(StreamsTask task : tasks) {
-                    while(task.isRunning()) {
+                    int count = 0;
+                    while(count < 20 && task.isRunning()) {
                         Thread.sleep(500);
+                        count++;
+                    }
+                    if(task.isRunning()) {
+                        LOGGER.warn("Task {} failed to terminate in allotted timeframe", task.toString());
                     }
                 }
             }
@@ -268,7 +311,11 @@ public class LocalStreamBuilder implements StreamBuilder {
      */
     @Override
     public void stop() {
-
+        try {
+            shutdown(tasks);
+        } catch (Exception e) {
+            forceShutdown(tasks);
+        }
     }
 
     private void connectToOtherComponents(String[] conntectToIds, StreamComponent toBeConnected) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6934c7a0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
index 1eac1d9..8146bdd 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
@@ -72,12 +72,13 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
         try {
             this.writer.prepare(this.streamConfig);
             StreamsDatum datum = this.inQueue.poll();
-            while(datum != null || this.keepRunning.get()) {
+            while(this.keepRunning.get()) {
                 if(datum != null) {
                     try {
                         this.writer.write(datum);
                         statusCounter.incrementStatus(DatumStatus.SUCCESS);
                     } catch (Exception e) {
+                        LOGGER.error("Error writing to persist writer {}", this.writer.getClass().getSimpleName(), e);
                         this.keepRunning.set(false);
                         statusCounter.incrementStatus(DatumStatus.FAIL);
                     }
@@ -86,12 +87,15 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt
                     try {
                         Thread.sleep(this.sleepTime);
                     } catch (InterruptedException e) {
+                        LOGGER.warn("Thread interrupted in Writer task for {}",this.writer.getClass().getSimpleName(), e);
                         this.keepRunning.set(false);
                     }
                 }
                 datum = this.inQueue.poll();
             }
 
+        } catch(Exception e) {
+            LOGGER.error("Failed to execute Persist Writer {}",this.writer.getClass().getSimpleName(), e);
         } finally {
             this.writer.cleanUp();
             this.isRunning.set(false);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6934c7a0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
index d1ac905..d4c7a16 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
@@ -67,7 +67,7 @@ public class StreamsProcessorTask extends BaseStreamsTask {
         try {
             this.processor.prepare(this.streamConfig);
             StreamsDatum datum = this.inQueue.poll();
-            while(datum != null || this.keepRunning.get()) {
+            while(this.keepRunning.get()) {
                 if(datum != null) {
                     List<StreamsDatum> output = this.processor.process(datum);
                     if(output != null) {


[3/4] git commit: Fixed deadlock issue

Posted by mf...@apache.org.
Fixed deadlock issue


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/fb8f7aea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/fb8f7aea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/fb8f7aea

Branch: refs/heads/STREAMS-72
Commit: fb8f7aea3032296a4c0dd779aca976104de44d61
Parents: 6934c7a
Author: mfranklin <mf...@apache.org>
Authored: Thu May 1 19:53:18 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu May 1 19:53:18 2014 -0400

----------------------------------------------------------------------
 .../ElasticsearchPersistWriter.java             | 20 +++++++++++++-------
 1 file changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb8f7aea/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index c574eeb..5756e1c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -54,7 +54,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
     private static final long WAITING_DOCS_LIMIT = 10000;
     private static final int BYTES_IN_MB = 1024 * 1024;
     private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
-    private static final long MAX_MS_BEFORE_FLUSH = 10000;
+    private static final long DEFAULT_MAX_WAIT = 10000;
 
     private final List<String> affectedIndexes = new ArrayList<String>();
     private final ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
@@ -251,7 +251,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
 
     @Override
     public void prepare(Object configurationObject) {
-        maxMsBeforeFlush = config.getMaxTimeBetweenFlushMs() == null ? MAX_MS_BEFORE_FLUSH : config.getMaxTimeBetweenFlushMs();
+        maxMsBeforeFlush = config.getMaxTimeBetweenFlushMs() == null ? DEFAULT_MAX_WAIT : config.getMaxTimeBetweenFlushMs();
         mapper = StreamsJacksonMapper.getInstance();
         start();
     }
@@ -284,7 +284,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
     }
 
     public void flushInternal() {
-        //This is unlocked in the callback
         lock.writeLock().lock();
         // we do not have a working bulk request, we can just exit here.
         if (this.bulkRequest == null || batchItemsSent == 0)
@@ -339,6 +338,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
             }
         } catch (Exception e) {
             LOGGER.info("We were broken from our loop: {}", e.getMessage());
+        } finally {
+            lock.writeLock().unlock();
         }
 
     }
@@ -472,11 +473,20 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
         return toReturn;
     }
 
+    /**
+     * This method is to ONLY be called by flushInternal otherwise the counts will be off.
+     * @param bulkRequest
+     * @param thisSent
+     * @param thisSizeInBytes
+     */
     private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final Long thisSizeInBytes) {
+        final Object messenger = new Object();
         LOGGER.debug("Attempting to write {} items to ES", thisSent);
         bulkRequest.execute().addListener(new ActionListener<BulkResponse>() {
             @Override
             public void onResponse(BulkResponse bulkItemResponses) {
+                lastWrite.set(System.currentTimeMillis());
+
                 if (bulkItemResponses.hasFailures())
                     LOGGER.warn("Bulk Uploading had totalFailed: " + bulkItemResponses.buildFailureMessage());
 
@@ -500,9 +510,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
                 if (thisSent != (thisOk + thisFailed))
                     LOGGER.error("We sent more items than this");
 
-                lastWrite.set(System.currentTimeMillis());
-                lock.writeLock().unlock();
-
                 LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]",
                         MEGABYTE_FORMAT.format((double) thisSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(thisOk), NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis),
                         MEGABYTE_FORMAT.format((double) totalSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding()));
@@ -510,7 +517,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
 
             @Override
             public void onFailure(Throwable e) {
-                lock.writeLock().unlock();
                 LOGGER.error("Error bulk loading: {}", e.getMessage());
                 e.printStackTrace();
             }


[4/4] git commit: Fixed issue where exception terminated thread and it wouldn't start again

Posted by mf...@apache.org.
Fixed issue where exception terminated thread and it wouldn't start again


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f62afa57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f62afa57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f62afa57

Branch: refs/heads/STREAMS-72
Commit: f62afa57ce5ca19d639c84e89ea29f998a16fbe5
Parents: fb8f7ae
Author: mfranklin <mf...@apache.org>
Authored: Thu May 1 19:54:56 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu May 1 19:54:56 2014 -0400

----------------------------------------------------------------------
 .../provider/SysomosHeartbeatStream.java        | 45 +++++++++++---------
 1 file changed, 26 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f62afa57/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
index c5145fb..9cd5898 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
@@ -72,29 +72,36 @@ public class SysomosHeartbeatStream implements Runnable {
     }
 
     protected QueryResult executeAPIRequest() {
-        BeatApi.BeatResponse response = this.client.createRequestBuilder()
-                .setHeartBeatId(heartbeatId)
-                .setOffset(0)
-                .setReturnSetSize(maxApiBatch).execute();
-
-        LOGGER.debug("Received {} results from API query", response.getCount());
+        BeatApi.BeatResponse response = null;
+        try {
+            response = this.client.createRequestBuilder()
+                    .setHeartBeatId(heartbeatId)
+                    .setOffset(0)
+                    .setReturnSetSize(maxApiBatch).execute();
+
+            LOGGER.debug("Received {} results from API query", response.getCount());
+        } catch (Exception e) {
+            LOGGER.warn("Error querying Sysomos API", e);
+        }
 
         String currentId = null;
         boolean matched = false;
-        for(BeatApi.BeatResponse.Beat beat : response.getBeat()) {
-            String docId = beat.getDocid();
-            //We get documents in descending time order.  This will set the id to the latest document
-            if(currentId == null) {
-                currentId = docId;
-            }
-            //We only want to process documents that we know we have not seen before
-            if(lastID != null && lastID.equals(docId)) {
-                matched = true;
-                break;
+        if(response != null) {
+            for (BeatApi.BeatResponse.Beat beat : response.getBeat()) {
+                String docId = beat.getDocid();
+                //We get documents in descending time order.  This will set the id to the latest document
+                if (currentId == null) {
+                    currentId = docId;
+                }
+                //We only want to process documents that we know we have not seen before
+                if (lastID != null && lastID.equals(docId)) {
+                    matched = true;
+                    break;
+                }
+                StreamsDatum item = new StreamsDatum(beat, docId);
+                item.getMetadata().put("heartbeat", this.heartbeatId);
+                this.provider.enqueueItem(item);
             }
-            StreamsDatum item = new StreamsDatum(beat, docId);
-            item.getMetadata().put("heartbeat", this.heartbeatId);
-            this.provider.enqueueItem(item);
         }
         return new QueryResult(matched, currentId);
     }