You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/05/21 16:25:11 UTC
git commit: Updated counters with locks and tracking requests
Repository: incubator-streams
Updated Branches:
refs/heads/master a4573f247 -> 905cc8c77
Updated counters with locks and tracking requests
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/905cc8c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/905cc8c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/905cc8c7
Branch: refs/heads/master
Commit: 905cc8c77db6f5c78548e9ba109d8a71001285ae
Parents: a4573f2
Author: mfranklin <mf...@apache.org>
Authored: Wed May 21 10:04:49 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Wed May 21 10:04:49 2014 -0400
----------------------------------------------------------------------
.../ElasticsearchPersistWriter.java | 120 ++++++++++++++-----
1 file changed, 91 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/905cc8c7/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 271521c..b913ec5 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -1,14 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.streams.elasticsearch;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.*;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@@ -76,6 +96,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
protected Thread task;
protected volatile Queue<StreamsDatum> persistQueue;
+ protected final List<ListenableActionFuture<BulkResponse>> responses = Lists.newLinkedList();
private volatile int currentItems = 0;
private volatile int totalSent = 0;
@@ -206,8 +227,17 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
// before they close, check to ensure that
this.flush();
+ this.lock.writeLock().lock();
+
int count = 0;
// We are going to give it 5 minutes.
+ for(ListenableActionFuture<BulkResponse> future : responses) {
+ if(future.isDone() || future.isCancelled()) {
+ BulkResponse response = future.get();
+ LOGGER.warn("Found index request for {} items that was closed without notification", response.getItems().length);
+ updateTotals(response, 0, 0);
+ }
+ }
while (this.getTotalOutstanding() > 0 && count++ < 20 * 60 * 5)
Thread.sleep(50);
@@ -251,6 +281,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
// this line of code should be logically unreachable.
LOGGER.warn("This is unexpected: {}", e.getMessage());
e.printStackTrace();
+ } finally {
+ this.lock.writeLock().unlock();
}
}
@@ -527,7 +559,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
maxTimeBetweenFlushMs = config.getMaxTimeBetweenFlushMs() == null ? DEFAULT_MAX_WAIT : config.getMaxTimeBetweenFlushMs().longValue();
start();
}
-
+
/**
* This method is to ONLY be called by flushInternal otherwise the counts will be off.
* @param bulkRequest
@@ -535,49 +567,61 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
* @param thisSizeInBytes
*/
private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final Long thisSizeInBytes) {
- final Object messenger = new Object();
LOGGER.debug("Attempting to write {} items to ES", thisSent);
- bulkRequest.execute().addListener(new ActionListener<BulkResponse>() {
+ final ListenableActionFuture<BulkResponse> responseFuture = bulkRequest.execute();
+ this.addResponseFuture(responseFuture);
+ responseFuture.addListener(new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
lastWrite.set(System.currentTimeMillis());
+ removeResponseFuture(responseFuture);
- if (bulkItemResponses.hasFailures())
- LOGGER.warn("Bulk Uploading had totalFailed: " + bulkItemResponses.buildFailureMessage());
-
- long thisFailed = 0;
- long thisOk = 0;
- long thisMillis = bulkItemResponses.getTookInMillis();
-
- // keep track of the number of totalFailed and items that we have totalOk.
- for (BulkItemResponse resp : bulkItemResponses.getItems()) {
- if (resp.isFailed())
- thisFailed++;
- else
- thisOk++;
- }
-
- totalAttempted += thisSent;
- totalOk += thisOk;
- totalFailed += thisFailed;
- totalSeconds += (thisMillis / 1000);
-
- if (thisSent != (thisOk + thisFailed))
- LOGGER.error("We sent more items than this");
-
- LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]",
- MEGABYTE_FORMAT.format((double) thisSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(thisOk), NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis),
- MEGABYTE_FORMAT.format((double) totalSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding()));
+ updateTotals(bulkItemResponses, thisSent, thisSizeInBytes);
}
@Override
public void onFailure(Throwable e) {
LOGGER.error("Error bulk loading: {}", e.getMessage());
+ removeResponseFuture(responseFuture);
e.printStackTrace();
}
});
}
+ private void updateTotals(BulkResponse bulkItemResponses, Integer thisSent, double thisSizeInBytes) {
+ if (bulkItemResponses.hasFailures())
+ LOGGER.warn("Bulk Uploading had totalFailed: " + bulkItemResponses.buildFailureMessage());
+
+ long thisFailed = 0;
+ long thisOk = 0;
+ long thisMillis = bulkItemResponses.getTookInMillis();
+
+ // keep track of the number of totalFailed and items that we have totalOk.
+ for (BulkItemResponse resp : bulkItemResponses.getItems()) {
+ if (resp.isFailed())
+ thisFailed++;
+ else
+ thisOk++;
+ }
+
+ try {
+ lock.writeLock().lock();
+ totalAttempted += thisSent;
+ totalOk += thisOk;
+ totalFailed += thisFailed;
+ totalSeconds += (thisMillis / 1000);
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ if (thisSent != (thisOk + thisFailed))
+ LOGGER.error("We sent more items than this");
+
+ LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]",
+ MEGABYTE_FORMAT.format(thisSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(thisOk), NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis),
+ MEGABYTE_FORMAT.format((double) totalSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding()));
+
+ }
private void checkAndCreateBulkRequest() {
@@ -591,5 +635,23 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
}
}
+ private void addResponseFuture(ListenableActionFuture<BulkResponse> future) {
+ try {
+ lock.writeLock().lock();
+ this.responses.add(future);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private void removeResponseFuture(ListenableActionFuture<BulkResponse> future) {
+ try {
+ lock.writeLock().lock();
+ this.responses.remove(future);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
}