You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/05/21 16:25:11 UTC

git commit: Updated counters with locks and tracking requests

Repository: incubator-streams
Updated Branches:
  refs/heads/master a4573f247 -> 905cc8c77


Updated counters with locks and tracking requests


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/905cc8c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/905cc8c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/905cc8c7

Branch: refs/heads/master
Commit: 905cc8c77db6f5c78548e9ba109d8a71001285ae
Parents: a4573f2
Author: mfranklin <mf...@apache.org>
Authored: Wed May 21 10:04:49 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Wed May 21 10:04:49 2014 -0400

----------------------------------------------------------------------
 .../ElasticsearchPersistWriter.java             | 120 ++++++++++++++-----
 1 file changed, 91 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/905cc8c7/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 271521c..b913ec5 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -1,14 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.streams.elasticsearch;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Objects;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.*;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ListenableActionFuture;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@@ -76,6 +96,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
     protected Thread task;
 
     protected volatile Queue<StreamsDatum> persistQueue;
+    protected final List<ListenableActionFuture<BulkResponse>> responses = Lists.newLinkedList();
 
     private volatile int currentItems = 0;
     private volatile int totalSent = 0;
@@ -206,8 +227,17 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
             // before they close, check to ensure that
             this.flush();
 
+            this.lock.writeLock().lock();
+
             int count = 0;
             // We are going to give it 5 minutes.
+            for(ListenableActionFuture<BulkResponse> future : responses) {
+                if(future.isDone() || future.isCancelled()) {
+                    BulkResponse response = future.get();
+                    LOGGER.warn("Found index request for {} items that was closed without notification", response.getItems().length);
+                    updateTotals(response, 0, 0);
+                }
+            }
             while (this.getTotalOutstanding() > 0 && count++ < 20 * 60 * 5)
                 Thread.sleep(50);
 
@@ -251,6 +281,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
             // this line of code should be logically unreachable.
             LOGGER.warn("This is unexpected: {}", e.getMessage());
             e.printStackTrace();
+        } finally {
+            this.lock.writeLock().unlock();
         }
     }
 
@@ -527,7 +559,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
         maxTimeBetweenFlushMs = config.getMaxTimeBetweenFlushMs() == null ? DEFAULT_MAX_WAIT : config.getMaxTimeBetweenFlushMs().longValue();
         start();
     }
-    
+
     /**
      * This method is to ONLY be called by flushInternal otherwise the counts will be off.
      * @param bulkRequest
@@ -535,49 +567,61 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
      * @param thisSizeInBytes
      */
     private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final Long thisSizeInBytes) {
-        final Object messenger = new Object();
         LOGGER.debug("Attempting to write {} items to ES", thisSent);
-        bulkRequest.execute().addListener(new ActionListener<BulkResponse>() {
+        final ListenableActionFuture<BulkResponse> responseFuture = bulkRequest.execute();
+        this.addResponseFuture(responseFuture);
+        responseFuture.addListener(new ActionListener<BulkResponse>() {
             @Override
             public void onResponse(BulkResponse bulkItemResponses) {
                 lastWrite.set(System.currentTimeMillis());
+                removeResponseFuture(responseFuture);
 
-                if (bulkItemResponses.hasFailures())
-                    LOGGER.warn("Bulk Uploading had totalFailed: " + bulkItemResponses.buildFailureMessage());
-
-                long thisFailed = 0;
-                long thisOk = 0;
-                long thisMillis = bulkItemResponses.getTookInMillis();
-
-                // keep track of the number of totalFailed and items that we have totalOk.
-                for (BulkItemResponse resp : bulkItemResponses.getItems()) {
-                    if (resp.isFailed())
-                        thisFailed++;
-                    else
-                        thisOk++;
-                }
-
-                totalAttempted += thisSent;
-                totalOk += thisOk;
-                totalFailed += thisFailed;
-                totalSeconds += (thisMillis / 1000);
-
-                if (thisSent != (thisOk + thisFailed))
-                    LOGGER.error("We sent more items than this");
-
-                LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]",
-                        MEGABYTE_FORMAT.format((double) thisSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(thisOk), NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis),
-                        MEGABYTE_FORMAT.format((double) totalSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding()));
+                updateTotals(bulkItemResponses, thisSent, thisSizeInBytes);
             }
 
             @Override
             public void onFailure(Throwable e) {
                 LOGGER.error("Error bulk loading: {}", e.getMessage());
+                removeResponseFuture(responseFuture);
                 e.printStackTrace();
             }
         });
     }
 
+    private void updateTotals(BulkResponse bulkItemResponses, Integer thisSent, double thisSizeInBytes) {
+        if (bulkItemResponses.hasFailures())
+            LOGGER.warn("Bulk Uploading had totalFailed: " + bulkItemResponses.buildFailureMessage());
+
+        long thisFailed = 0;
+        long thisOk = 0;
+        long thisMillis = bulkItemResponses.getTookInMillis();
+
+        // keep track of the number of totalFailed and items that we have totalOk.
+        for (BulkItemResponse resp : bulkItemResponses.getItems()) {
+            if (resp.isFailed())
+                thisFailed++;
+            else
+                thisOk++;
+        }
+
+        try {
+            lock.writeLock().lock();
+            totalAttempted += thisSent;
+            totalOk += thisOk;
+            totalFailed += thisFailed;
+            totalSeconds += (thisMillis / 1000);
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        if (thisSent != (thisOk + thisFailed))
+            LOGGER.error("We sent more items than this");
+
+        LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]",
+                MEGABYTE_FORMAT.format(thisSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(thisOk), NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis),
+                MEGABYTE_FORMAT.format((double) totalSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding()));
+
+    }
 
 
     private void checkAndCreateBulkRequest() {
@@ -591,5 +635,23 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
         }
     }
 
+    private void addResponseFuture(ListenableActionFuture<BulkResponse> future) {
+        try {
+            lock.writeLock().lock();
+            this.responses.add(future);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private void removeResponseFuture(ListenableActionFuture<BulkResponse> future) {
+        try {
+            lock.writeLock().lock();
+            this.responses.remove(future);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
 }