You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/07/02 18:13:52 UTC
[01/12] git commit: STREAMS-113 | Added ability to convert a Twitter
User object an an Activity object.
Repository: incubator-streams
Updated Branches:
refs/heads/instagram a3f443fe7 -> 815ce2abd
STREAMS-113 | Added ability to convert a Twitter User object an an Activity object.
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/bc9fe492
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/bc9fe492
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/bc9fe492
Branch: refs/heads/instagram
Commit: bc9fe49270e9b64e2a8deaed7924b2ab4ff22458
Parents: 829f805
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Mon Jun 23 14:07:09 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Mon Jun 23 14:07:09 2014 -0500
----------------------------------------------------------------------
.../twitter/processor/TwitterTypeConverter.java | 20 +++++-
.../TwitterJsonActivitySerializer.java | 21 ++++++
.../TwitterJsonUserActivitySerializer.java | 72 ++++++++++++++++++++
.../serializer/util/TwitterActivityUtil.java | 22 ++++++
4 files changed, 134 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bc9fe492/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
index e2119a5..0737e39 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.streams.twitter.processor;
import com.fasterxml.jackson.core.JsonParseException;
@@ -158,7 +176,7 @@ public class TwitterTypeConverter implements StreamsProcessor {
node = (ObjectNode)mapper.valueToTree(item);
// since data is coming from outside provider, we don't know what type the events are
- Class inClass = TwitterEventClassifier.detectClass((String)item);
+ Class inClass = TwitterEventClassifier.detectClass(item.toString());
Object out = convert(node, inClass, outClass);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bc9fe492/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
index cae2ff8..5b5be96 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.streams.twitter.serializer;
import org.apache.commons.lang.NotImplementedException;
@@ -20,6 +38,7 @@ public class TwitterJsonActivitySerializer implements ActivitySerializer<String>
TwitterJsonTweetActivitySerializer tweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
TwitterJsonRetweetActivitySerializer retweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
TwitterJsonDeleteActivitySerializer deleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
+ TwitterJsonUserActivitySerializer userActivitySerializer = new TwitterJsonUserActivitySerializer();
@Override
public String serializationFormat() {
@@ -43,6 +62,8 @@ public class TwitterJsonActivitySerializer implements ActivitySerializer<String>
activity = retweetActivitySerializer.deserialize(serialized);
else if( documentSubType == Delete.class )
activity = deleteActivitySerializer.deserialize(serialized);
+ else if( documentSubType == User.class )
+ activity = userActivitySerializer.deserialize(serialized);
else throw new ActivitySerializerException("unrecognized type");
return activity;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bc9fe492/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java
new file mode 100644
index 0000000..2ae5355
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.serializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.User;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.updateActivity;
+
+public class TwitterJsonUserActivitySerializer implements ActivitySerializer<String>, Serializable {
+
+ public TwitterJsonUserActivitySerializer() {}
+
+ @Override
+ public String serializationFormat() {
+ return null;
+ }
+
+ @Override
+ public String serialize(Activity deserialized) throws ActivitySerializerException {
+ return null;
+ }
+
+ @Override
+ public Activity deserialize(String event) throws ActivitySerializerException {
+
+ ObjectMapper mapper = new StreamsJacksonMapper();
+ User user = null;
+ try {
+ user = mapper.readValue(event, User.class);
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ Activity activity = new Activity();
+ updateActivity(user, activity);
+
+ return activity;
+ }
+
+ @Override
+ public List<Activity> deserializeAll(List<String> serializedList) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bc9fe492/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java
index feb6978..56b7005 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java
@@ -79,6 +79,17 @@ public class TwitterActivityUtil {
}
/**
+ * Updates the given Activity object with the values from the User
+ * @param user the object to use as the source
+ * @param activity the target of the updates. Will receive all values from the tweet.
+ * @throws ActivitySerializerException
+ */
+ public static void updateActivity(User user, Activity activity) throws ActivitySerializerException {
+ activity.setActor(buildActor(user));
+ activity.setId(null);
+ }
+
+ /**
* Updates the activity for a delete event
* @param delete the delete event
* @param activity the Activity object to update
@@ -163,6 +174,17 @@ public class TwitterActivityUtil {
public static Actor buildActor(Tweet tweet) {
Actor actor = new Actor();
User user = tweet.getUser();
+
+ return buildActor(user);
+ }
+
+ /**
+ * Builds the activity {@link org.apache.streams.pojo.json.Actor} object from the User
+ * @param user the object to use as the source
+ * @return a valid Actor populated from the Tweet
+ */
+ public static Actor buildActor(User user) {
+ Actor actor = new Actor();
actor.setId(formatId(
Optional.fromNullable(
user.getIdStr())
[07/12] git commit: Fixed build issues with PR#45
Posted by mf...@apache.org.
Fixed build issues with PR#45
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d91c4a44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d91c4a44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d91c4a44
Branch: refs/heads/instagram
Commit: d91c4a444cba9bc805e3b04fd1914e86aa8c194f
Parents: 7d65fb2
Author: mfranklin <mf...@apache.org>
Authored: Mon Jun 30 08:45:55 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jun 30 08:45:55 2014 -0400
----------------------------------------------------------------------
.../ElasticsearchPersistUpdater.java | 35 ------------
.../ElasticsearchPersistWriterTask.java | 56 --------------------
.../ElasticsearchWriterConfiguration.json | 5 ++
3 files changed, 5 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d91c4a44/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index 6982862..b2e7556 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -36,7 +36,6 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
@@ -44,8 +43,6 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
-import org.json.JSONException;
-import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,44 +100,16 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
this.batchSize = batchSize;
}
- public void setVeryLargeBulk(boolean veryLargeBulk) {
- this.veryLargeBulk = veryLargeBulk;
- }
-
private final List<String> affectedIndexes = new ArrayList<String>();
- public int getTotalOutstanding() {
- return this.totalSent - (this.totalFailed + this.totalOk);
- }
-
public long getFlushThresholdSizeInBytes() {
return flushThresholdSizeInBytes;
}
- public int getTotalSent() {
- return totalSent;
- }
-
- public int getTotalSeconds() {
- return totalSeconds;
- }
-
- public int getTotalOk() {
- return totalOk;
- }
-
- public int getTotalFailed() {
- return totalFailed;
- }
-
public int getTotalBatchCount() {
return totalBatchCount;
}
- public long getTotalSizeInBytes() {
- return totalSizeInBytes;
- }
-
public long getBatchSizeInBytes() {
return batchSizeInBytes;
}
@@ -149,10 +118,6 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
return batchItemsSent;
}
- public List<String> getAffectedIndexes() {
- return this.affectedIndexes;
- }
-
public void setFlushThresholdSizeInBytes(long sizeInBytes) {
this.flushThresholdSizeInBytes = sizeInBytes;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d91c4a44/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
deleted file mode 100644
index 824f1ac..0000000
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.elasticsearch;
-
-import org.apache.streams.core.StreamsDatum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Random;
-
-public class ElasticsearchPersistWriterTask implements Runnable {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriterTask.class);
-
- private ElasticsearchPersistWriter writer;
-
- public ElasticsearchPersistWriterTask(ElasticsearchPersistWriter writer) {
- this.writer = writer;
- }
-
- @Override
- public void run() {
-
- while(true) {
- if( writer.persistQueue.peek() != null ) {
- try {
- StreamsDatum entry = writer.persistQueue.remove();
- writer.write(entry);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- try {
- Thread.sleep(new Random().nextInt(1));
- } catch (InterruptedException e) {}
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d91c4a44/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index b107be6..a38ff85 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -24,6 +24,11 @@
"description": "Item Count before flush",
"default": 100
},
+ "batchBytes": {
+ "type": "integer",
+ "description": "Number of bytes before flush",
+ "default": 5242880
+ },
"maxTimeBetweenFlushMs": {
"type": "integer"
}
[11/12] git commit: Merge PR#42 from robdouglas 'rdouglas/STREAMS-113'
Posted by mf...@apache.org.
Merge PR#42 from robdouglas 'rdouglas/STREAMS-113'
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c1246d96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c1246d96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c1246d96
Branch: refs/heads/instagram
Commit: c1246d963822aa9c2f8fd9408b0b781935535596
Parents: ba96627 42debdf
Author: mfranklin <mf...@apache.org>
Authored: Wed Jul 2 09:21:50 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Wed Jul 2 09:21:50 2014 -0400
----------------------------------------------------------------------
.../twitter/processor/TwitterTypeConverter.java | 2 +-
.../TwitterJsonActivitySerializer.java | 3 +
.../TwitterJsonUserActivitySerializer.java | 72 ++++++++++++++++++++
.../serializer/util/TwitterActivityUtil.java | 22 ++++++
4 files changed, 98 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
[10/12] git commit: Added default initial time for Sysomos
Posted by mf...@apache.org.
Added default initial time for Sysomos
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ba96627c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ba96627c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ba96627c
Branch: refs/heads/instagram
Commit: ba96627ca1eb5a52b9c44cfd0a66a7cb58539503
Parents: 31888f9
Author: mfranklin <mf...@apache.org>
Authored: Mon Jun 30 11:03:38 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jun 30 11:03:38 2014 -0400
----------------------------------------------------------------------
.../apache/streams/sysomos/provider/SysomosHeartbeatStream.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ba96627c/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
index 946572a..8ad5ed7 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
@@ -49,7 +49,7 @@ public class SysomosHeartbeatStream implements Runnable {
private boolean enabled = true;
public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId) {
- this(provider, heartbeatId, OperatingMode.DATE);
+ this(provider, heartbeatId, null, DateTime.now());
}
public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId, DateTime beforeTime, DateTime afterTime) {
[05/12] git commit: STREAMS-119 | Added support for isRunning
Posted by mf...@apache.org.
STREAMS-119 | Added support for isRunning
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/da4d9ed8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/da4d9ed8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/da4d9ed8
Branch: refs/heads/instagram
Commit: da4d9ed801b212d6e6f1f351b609842f14c79bba
Parents: 34c95a6
Author: mfranklin <mf...@apache.org>
Authored: Fri Jun 27 14:52:27 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Fri Jun 27 14:52:27 2014 -0400
----------------------------------------------------------------------
.../local/tasks/StreamsProviderTask.java | 51 +++++++++-------
.../local/builders/LocalStreamBuilderTest.java | 52 +++++++++++++---
.../local/test/processors/SlowProcessor.java | 50 ++++++++++++++++
.../test/providers/EmptyResultSetProvider.java | 2 +-
.../test/providers/NumericMessageProvider.java | 63 ++++----------------
.../test/component/FileReaderProvider.java | 52 ++++------------
6 files changed, 151 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/da4d9ed8/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index f384380..ed95cdd 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -35,7 +35,6 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
private final static Logger LOGGER = LoggerFactory.getLogger(StreamsProviderTask.class);
- @Override
public DatumStatusCounter getDatumStatusCounter() {
return this.statusCounter;
}
@@ -51,12 +50,13 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
private static final int END = 1;
private StreamsProvider provider;
- private AtomicBoolean keepRunning;
+ private final AtomicBoolean keepRunning = new AtomicBoolean(true);
+ private final AtomicBoolean flushing = new AtomicBoolean(false);
+ private final AtomicBoolean started = new AtomicBoolean(false);
private Type type;
private BigInteger sequence;
private DateTime[] dateRange;
private Map<String, Object> config;
- private AtomicBoolean isRunning;
private int timeout;
private int zeros = 0;
@@ -72,8 +72,6 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
this.type = Type.PERPETUAL;
else
this.type = Type.READ_CURRENT;
- this.keepRunning = new AtomicBoolean(true);
- this.isRunning = new AtomicBoolean(true);
this.timeout = DEFAULT_TIMEOUT_MS;
}
@@ -86,8 +84,6 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
this.provider = provider;
this.type = Type.READ_NEW;
this.sequence = sequence;
- this.keepRunning = new AtomicBoolean(true);
- this.isRunning = new AtomicBoolean(true);
this.timeout = DEFAULT_TIMEOUT_MS;
}
@@ -103,8 +99,6 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
this.dateRange = new DateTime[2];
this.dateRange[START] = start;
this.dateRange[END] = end;
- this.keepRunning = new AtomicBoolean(true);
- this.isRunning = new AtomicBoolean(true);
this.timeout = DEFAULT_TIMEOUT_MS;
}
@@ -114,6 +108,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
@Override
public void stopTask() {
+ LOGGER.debug("Stopping Provider Task for {}", this.provider.getClass().getSimpleName());
this.keepRunning.set(false);
}
@@ -133,12 +128,12 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
try {
this.provider.prepare(this.config); //TODO allow for configuration objects
StreamsResultSet resultSet = null;
- this.isRunning.set(true);
long maxZeros = timeout / DEFAULT_SLEEP_TIME_MS;
switch(this.type) {
case PERPETUAL: {
provider.startStream();
- while(this.keepRunning.get()) {
+ this.started.set(true);
+ while(this.isRunning()) {
try {
resultSet = provider.readCurrent();
if( resultSet.size() == 0 )
@@ -152,37 +147,51 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
this.keepRunning.set(false);
Thread.sleep(DEFAULT_SLEEP_TIME_MS);
} catch (InterruptedException e) {
+ LOGGER.warn("Thread interrupted");
this.keepRunning.set(false);
}
}
}
break;
- case READ_CURRENT: resultSet = this.provider.readCurrent();
+ case READ_CURRENT:
+ resultSet = this.provider.readCurrent();
+ this.started.set(true);
break;
- case READ_NEW: resultSet = this.provider.readNew(this.sequence);
+ case READ_NEW:
+ resultSet = this.provider.readNew(this.sequence);
+ this.started.set(true);
break;
- case READ_RANGE: resultSet = this.provider.readRange(this.dateRange[START], this.dateRange[END]);
+ case READ_RANGE:
+ resultSet = this.provider.readRange(this.dateRange[START], this.dateRange[END]);
+ this.started.set(true);
break;
default: throw new RuntimeException("Type has not been added to StreamsProviderTask.");
}
flushResults(resultSet);
} catch( Exception e ) {
- e.printStackTrace();
- } finally
- {
+ LOGGER.error("Error in processing provider stream", e);
+ } finally {
+ LOGGER.debug("Complete Provider Task execution for {}", this.provider.getClass().getSimpleName());
+ this.keepRunning.set(false);
this.provider.cleanUp();
- this.isRunning.set(false);
}
}
@Override
public boolean isRunning() {
- return this.isRunning.get();
+ //We want to make sure that we never return false if it is flushing, regardless of the state of the provider
+ //or whether we have been told to shut down. If someone really wants us to shut down, they will interrupt the
+ //thread and force us to shutdown. We also want to make sure we have had the opportunity to run before the
+ //runtime kills us.
+ return !this.started.get() || this.flushing.get() || (this.provider.isRunning() && this.keepRunning.get());
}
public void flushResults(StreamsResultSet resultSet) {
- for(StreamsDatum datum : resultSet) {
+ Queue<StreamsDatum> queue = resultSet.getQueue();
+ this.flushing.set(true);
+ while(!queue.isEmpty()) {
+ StreamsDatum datum = queue.poll();
if(!this.keepRunning.get()) {
break;
}
@@ -198,10 +207,12 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
try {
Thread.sleep(DEFAULT_SLEEP_TIME_MS);
} catch (InterruptedException e) {
+ LOGGER.warn("Thread interrupted");
this.keepRunning.set(false);
}
}
}
+ this.flushing.set(false);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/da4d9ed8/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
index ef05eb6..5a40865 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
@@ -26,6 +26,7 @@ import org.apache.streams.core.test.processors.PassthroughDatumCounterProcessor;
import org.apache.streams.core.test.providers.NumericMessageProvider;
import org.apache.streams.core.test.writer.SystemOutWriter;
import org.apache.streams.local.tasks.StreamsTask;
+import org.apache.streams.local.test.processors.SlowProcessor;
import org.apache.streams.local.test.providers.EmptyResultSetProvider;
import org.junit.Before;
import org.junit.Test;
@@ -36,6 +37,10 @@ import java.io.PrintStream;
import java.util.HashSet;
import java.util.Map;
import java.util.Scanner;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
import static org.hamcrest.Matchers.*;
@@ -185,17 +190,24 @@ public class LocalStreamBuilderTest {
}
@Test
- public void testDefaultProviderTimeout() {
- long start = System.currentTimeMillis();
- StreamBuilder builder = new LocalStreamBuilder();
- builder.newPerpetualStream("prov1", new EmptyResultSetProvider())
- .addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor(), 1, "prov1")
- .addStreamsProcessor("proc2", new PassthroughDatumCounterProcessor(), 1, "proc1")
+ public void testSlowProcessorBranch() {
+ int numDatums = 30;
+ int timeout = 2000;
+ Map<String, Object> config = Maps.newHashMap();
+ config.put(LocalStreamBuilder.TIMEOUT_KEY, timeout);
+ StreamBuilder builder = new LocalStreamBuilder(config);
+ builder.newReadCurrentStream("prov1", new NumericMessageProvider(numDatums))
+ .addStreamsProcessor("proc1", new SlowProcessor(), 1, "prov1")
.addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1");
builder.start();
- long end = System.currentTimeMillis();
- //We care mostly that it doesn't terminate too early. With thread shutdowns, etc, the actual time is indeterminate. Just make sure there is an upper bound
- assertThat((int)(end - start), is(allOf(greaterThanOrEqualTo(StreamsTask.DEFAULT_TIMEOUT_MS), lessThanOrEqualTo(2 * (StreamsTask.DEFAULT_TIMEOUT_MS)))));
+ int count = 0;
+ Scanner scanner = new Scanner(new ByteArrayInputStream(out.toByteArray()));
+ while(scanner.hasNextLine()) {
+ ++count;
+ scanner.nextLine();
+ }
+ assertThat(count, greaterThan(numDatums)); // using > because number of lines in system.out is non-deterministic
+
}
@Test
@@ -214,4 +226,26 @@ public class LocalStreamBuilderTest {
//We care mostly that it doesn't terminate too early. With thread shutdowns, etc, the actual time is indeterminate. Just make sure there is an upper bound
assertThat((int)(end - start), is(allOf(greaterThanOrEqualTo(timeout), lessThanOrEqualTo(4 * timeout))));
}
+
+ @Test
+ public void ensureShutdownWithBlockedQueue() throws InterruptedException {
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ int before = Thread.activeCount();
+ final StreamBuilder builder = new LocalStreamBuilder(Queues.<StreamsDatum>newLinkedBlockingQueue(1));
+ builder.newPerpetualStream("prov1", new NumericMessageProvider(30))
+ .addStreamsProcessor("proc1", new SlowProcessor(), 1, "prov1")
+ .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1");
+ service.submit(new Runnable(){
+ @Override
+ public void run() {
+ builder.start();
+ }
+ });
+ //Let streams spin up threads and start to process
+ Thread.sleep(500);
+ builder.stop();
+ service.shutdownNow();
+ service.awaitTermination(1000, TimeUnit.MILLISECONDS);
+ assertThat(Thread.activeCount(), is(equalTo(before)));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/da4d9ed8/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/SlowProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/SlowProcessor.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/SlowProcessor.java
new file mode 100644
index 0000000..93ec060
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/SlowProcessor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.local.test.processors;
+
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+
+import java.util.List;
+
+/**
+ */
+public class SlowProcessor implements StreamsProcessor {
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return Lists.newArrayList(entry);
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/da4d9ed8/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
index fe60295..28b9907 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
@@ -53,7 +53,7 @@ public class EmptyResultSetProvider implements StreamsProvider {
@Override
public boolean isRunning() {
- return false;
+ return true;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/da4d9ed8/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
index 159355d..379459f 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
@@ -18,6 +18,7 @@
package org.apache.streams.core.test.providers;
+import com.google.common.collect.Queues;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
@@ -25,7 +26,9 @@ import org.joda.time.DateTime;
import java.math.BigInteger;
import java.util.Iterator;
+import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Test StreamsProvider that sends out StreamsDatums numbered from 0 to numMessages.
@@ -45,17 +48,17 @@ public class NumericMessageProvider implements StreamsProvider {
@Override
public StreamsResultSet readCurrent() {
- return new ResultSet();
+ return new StreamsResultSet(constructQueue());
}
@Override
public StreamsResultSet readNew(BigInteger sequence) {
- return new ResultSet();
+ return new StreamsResultSet(constructQueue());
}
@Override
public StreamsResultSet readRange(DateTime start, DateTime end) {
- return new ResultSet();
+ return new StreamsResultSet(constructQueue());
}
@Override
@@ -73,53 +76,13 @@ public class NumericMessageProvider implements StreamsProvider {
}
-
- private class ResultSet extends StreamsResultSet {
-
- private ResultSet() {
- super(new ConcurrentLinkedQueue<StreamsDatum>());
- }
-
-// @Override
-// public long getStartTime() {
-// return 0;
-// }
-//
-// @Override
-// public long getEndTime() {
-// return 0;
-// }
-//
-// @Override
-// public String getSourceId() {
-// return null;
-// }
-//
-// @Override
-// public BigInteger getMaxSequence() {
-// return null;
-// }
-
- @Override
- public Iterator<StreamsDatum> iterator() {
- return new Iterator<StreamsDatum>() {
- private int i = 0;
-
- @Override
- public boolean hasNext() {
- return i < numMessages;
- }
-
- @Override
- public StreamsDatum next() {
- return new StreamsDatum(i++);
- }
-
- @Override
- public void remove() {
-
- }
- };
+ private Queue<StreamsDatum> constructQueue() {
+ Queue<StreamsDatum> datums = Queues.newArrayBlockingQueue(numMessages);
+ for(int i=0;i<numMessages;i++) {
+ datums.add(new StreamsDatum(i));
}
+ return datums;
}
}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/da4d9ed8/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
index 9c1ae97..1e0eb88 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
@@ -18,17 +18,13 @@
package org.apache.streams.test.component;
+import com.google.common.collect.Queues;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.joda.time.DateTime;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
import java.math.BigInteger;
-import java.sql.ResultSet;
-import java.util.Iterator;
import java.util.Queue;
import java.util.Scanner;
@@ -42,7 +38,6 @@ import java.util.Scanner;
public class FileReaderProvider implements StreamsProvider {
private String fileName;
- private InputStream inStream;
private Scanner scanner;
private StreamsDatumConverter converter;
@@ -58,7 +53,7 @@ public class FileReaderProvider implements StreamsProvider {
@Override
public StreamsResultSet readCurrent() {
- return new ResultSet();
+ return new StreamsResultSet(constructQueue(this.scanner));
}
@Override
@@ -73,7 +68,7 @@ public class FileReaderProvider implements StreamsProvider {
@Override
public boolean isRunning() {
- return this.scanner.hasNextLine();
+ return this.scanner != null && this.scanner.hasNextLine();
}
@Override
@@ -83,39 +78,18 @@ public class FileReaderProvider implements StreamsProvider {
@Override
public void cleanUp() {
- this.scanner.close();
- }
-
- private class ResultSet extends StreamsResultSet {
-
- public ResultSet() {
- super(null);
+ if(this.scanner!= null) {
+ this.scanner.close();
+ this.scanner = null;
}
+ }
-
- @Override
- public Iterator<StreamsDatum> iterator() {
- return new FileProviderIterator();
- }
-
- private class FileProviderIterator implements Iterator<StreamsDatum> {
-
-
-
- @Override
- public boolean hasNext() {
- return scanner.hasNextLine();
- }
-
- @Override
- public StreamsDatum next() {
- return converter.convert(scanner.nextLine());
- }
-
- @Override
- public void remove() {
-
- }
+ private Queue<StreamsDatum> constructQueue(Scanner scanner) {
+ Queue<StreamsDatum> data = Queues.newLinkedBlockingQueue();
+ while(scanner.hasNextLine()) {
+ data.add(converter.convert(scanner.nextLine()));
}
+ cleanUp();
+ return data;
}
}
[12/12] git commit: Merge branch 'master' into instagram
Posted by mf...@apache.org.
Merge branch 'master' into instagram
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/815ce2ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/815ce2ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/815ce2ab
Branch: refs/heads/instagram
Commit: 815ce2abdab1be8e279b48a7e38090f5f6948fcb
Parents: a3f443f c1246d9
Author: mfranklin <mf...@apache.org>
Authored: Wed Jul 2 12:13:35 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Wed Jul 2 12:13:35 2014 -0400
----------------------------------------------------------------------
.../ElasticsearchConfigurator.java | 4 +
.../ElasticsearchPersistUpdater.java | 35 -
.../ElasticsearchPersistWriter.java | 705 +++++++------------
.../ElasticsearchPersistWriterTask.java | 56 --
.../ElasticsearchWriterConfiguration.json | 5 +
.../provider/SysomosHeartbeatStream.java | 2 +-
.../twitter/processor/TwitterTypeConverter.java | 2 +-
.../TwitterJsonActivitySerializer.java | 3 +
.../TwitterJsonUserActivitySerializer.java | 72 ++
.../serializer/util/TwitterActivityUtil.java | 22 +
.../local/tasks/StreamsProviderTask.java | 51 +-
.../local/builders/LocalStreamBuilderTest.java | 52 +-
.../local/test/processors/SlowProcessor.java | 50 ++
.../test/providers/EmptyResultSetProvider.java | 2 +-
.../test/providers/NumericMessageProvider.java | 63 +-
.../test/component/FileReaderProvider.java | 52 +-
16 files changed, 517 insertions(+), 659 deletions(-)
----------------------------------------------------------------------
[04/12] git commit: good catch Matt F.
Posted by mf...@apache.org.
good catch Matt F.
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2e829d8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2e829d8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2e829d8d
Branch: refs/heads/instagram
Commit: 2e829d8dcd3deb25ec463fe5f15bb7e8c7d32edf
Parents: e0e4ad4
Author: Matthew Hager <Ma...@gmail.com>
Authored: Fri Jun 27 11:17:24 2014 -0500
Committer: Matthew Hager <Ma...@gmail.com>
Committed: Fri Jun 27 11:17:24 2014 -0500
----------------------------------------------------------------------
.../apache/streams/elasticsearch/ElasticsearchPersistWriter.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2e829d8d/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index c40cbfa..5956808 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -243,7 +243,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
try {
Thread.yield();
Thread.sleep(1);
- timeOutThresholdInMS++;
+ counter++;
} catch(InterruptedException ie) {
// No Operation
}
[09/12] git commit: Updated elastic search defaults with lower backoff
Posted by mf...@apache.org.
Updated elastic search defaults with lower backoff
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/31888f99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/31888f99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/31888f99
Branch: refs/heads/instagram
Commit: 31888f99c8c17c20b5c90d9d661d9f44bff6c7c6
Parents: d91c4a4
Author: mfranklin <mf...@apache.org>
Authored: Mon Jun 30 10:35:31 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jun 30 10:35:31 2014 -0400
----------------------------------------------------------------------
.../streams/elasticsearch/ElasticsearchPersistWriter.java | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/31888f99/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 5956808..72145e1 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -59,9 +59,12 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
private static final NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
- private static final long WAITING_DOCS_LIMIT = 10000;
- private static final long DEFAULT_MAX_WAIT = 10000;
private static final int DEFAULT_BATCH_SIZE = 100;
+ //ES defaults its bulk index queue to 50 items. We want to be under this on our backoff so set this to 1/2 ES default
+ //at a batch size as configured here.
+ private static final long WAITING_DOCS_LIMIT = DEFAULT_BATCH_SIZE * 25;
+ //A document should have to wait no more than 10s to get flushed
+ private static final long DEFAULT_MAX_WAIT = 10000;
private static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
[08/12] git commit: STREAMS-113 | Code review feedback
Posted by mf...@apache.org.
STREAMS-113 | Code review feedback
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/42debdf2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/42debdf2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/42debdf2
Branch: refs/heads/instagram
Commit: 42debdf2206fc99dc6a57f64e673a71a120adcf2
Parents: bc9fe49
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Mon Jun 30 09:29:18 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Mon Jun 30 09:29:18 2014 -0500
----------------------------------------------------------------------
.../org/apache/streams/twitter/processor/TwitterTypeConverter.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/42debdf2/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
index 0737e39..74cce27 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
@@ -176,7 +176,7 @@ public class TwitterTypeConverter implements StreamsProcessor {
node = (ObjectNode)mapper.valueToTree(item);
// since data is coming from outside provider, we don't know what type the events are
- Class inClass = TwitterEventClassifier.detectClass(item.toString());
+ Class inClass = TwitterEventClassifier.detectClass(mapper.writeValueAsString(item));
Object out = convert(node, inClass, outClass);
[03/12] git commit: changes from comments.
Posted by mf...@apache.org.
changes from comments.
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e0e4ad4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e0e4ad4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e0e4ad4a
Branch: refs/heads/instagram
Commit: e0e4ad4a7d1e12873b1aa1598fd7658f23462c2a
Parents: 1d36ab6
Author: Matthew Hager <Ma...@gmail.com>
Authored: Fri Jun 27 10:03:58 2014 -0500
Committer: Matthew Hager <Ma...@gmail.com>
Committed: Fri Jun 27 10:03:58 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchPersistWriter.java | 61 +-------------------
1 file changed, 2 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e0e4ad4a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 95f5f57..c40cbfa 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -157,12 +157,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
return docAsJson;
else {
ObjectNode node = (ObjectNode)OBJECT_MAPPER.readTree(docAsJson);
- try {
- node.put("_metadata", OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsBytes(streamsDatum.getMetadata())));
- }
- catch(Throwable e) {
- LOGGER.warn("Unable to write metadata");
- }
+ node.put("_metadata", OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsBytes(streamsDatum.getMetadata())));
return OBJECT_MAPPER.writeValueAsString(node);
}
}
@@ -180,8 +175,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
} catch (Throwable e) {
// this line of code should be logically unreachable.
- LOGGER.warn("This is unexpected: {}", e.getMessage());
- e.printStackTrace();
+ LOGGER.warn("This is unexpected: {}", e);
}
}
@@ -316,27 +310,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
add(indexRequestBuilder.request());
}
- /**
- * This function is trashed... needs to be fixed.
- *
- private synchronized void add(UpdateRequest request) {
- Preconditions.checkNotNull(request);
- checkAndCreateBulkRequest();
-
- checkIndexImplications(request.index());
-
- bulkRequest.add(request);
- try {
- Optional<Integer> size = Objects.firstNonNull(
- Optional.fromNullable(request.doc().source().length()),
- Optional.fromNullable(request.script().length()));
- trackItemAndBytesWritten(size.get().longValue());
- } catch (NullPointerException x) {
- trackItemAndBytesWritten(1000);
- }
- }
- */
-
protected void add(IndexRequest request) {
Preconditions.checkNotNull(request);
@@ -424,36 +397,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
}
}
- /**
- *
- private Set<String> checkIds(Set<String> input, String index, String type) {
-
- IdsQueryBuilder idsFilterBuilder = new IdsQueryBuilder();
-
- for (String s : input)
- idsFilterBuilder.addIds(s);
-
- SearchRequestBuilder searchRequestBuilder = this.manager.getClient()
- .prepareSearch(index)
- .setTypes(type)
- .setQuery(idsFilterBuilder)
- .addField("_id")
- .setSize(input.size());
-
- SearchHits hits = searchRequestBuilder.execute()
- .actionGet()
- .getHits();
-
- Set<String> toReturn = new HashSet<String>();
-
- for (SearchHit hit : hits) {
- toReturn.add(hit.getId());
- }
-
- return toReturn;
- }
- */
-
public void prepare(Object configurationObject) {
this.veryLargeBulk = config.getBulk() == null ?
Boolean.FALSE :
[06/12] git commit: Merge PR#45 from Smashew
'smashew/Streams-ES-Writer-Fix'
Posted by mf...@apache.org.
Merge PR#45 from Smashew 'smashew/Streams-ES-Writer-Fix'
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7d65fb26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7d65fb26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7d65fb26
Branch: refs/heads/instagram
Commit: 7d65fb26da0e4493eb440b43722469c8da6abb46
Parents: da4d9ed 2e829d8
Author: mfranklin <mf...@apache.org>
Authored: Mon Jun 30 08:28:13 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jun 30 08:28:13 2014 -0400
----------------------------------------------------------------------
.../ElasticsearchConfigurator.java | 4 +
.../ElasticsearchPersistWriter.java | 698 +++++++------------
2 files changed, 257 insertions(+), 445 deletions(-)
----------------------------------------------------------------------
[02/12] git commit: Fixed the ElasticSearchPersistWriter
Posted by mf...@apache.org.
Fixed the ElasticSearchPersistWriter
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/1d36ab61
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/1d36ab61
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/1d36ab61
Branch: refs/heads/instagram
Commit: 1d36ab61e1046ec8c197a9be23b3234fed4cf56c
Parents: 34c95a6
Author: Matthew Hager <Ma...@gmail.com>
Authored: Thu Jun 26 15:16:24 2014 -0500
Committer: Matthew Hager <Ma...@gmail.com>
Committed: Thu Jun 26 15:16:24 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchConfigurator.java | 4 +
.../ElasticsearchPersistWriter.java | 693 ++++++++-----------
2 files changed, 283 insertions(+), 414 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1d36ab61/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
index af4e360..4507fb5 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
@@ -77,6 +77,10 @@ public class ElasticsearchConfigurator {
if( elasticsearch.hasPath("batchSize"))
elasticsearchWriterConfiguration.setBatchSize(elasticsearch.getLong("batchSize"));
+ if( elasticsearch.hasPath("batchBytes"))
+ elasticsearchWriterConfiguration.setBatchBytes(elasticsearch.getLong("batchBytes"));
+
+
elasticsearchWriterConfiguration.setIndex(index);
elasticsearchWriterConfiguration.setType(type);
elasticsearchWriterConfiguration.setMaxTimeBetweenFlushMs(maxMsBeforeFlush);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1d36ab61/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index da5916b..95f5f57 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -16,20 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.streams.elasticsearch;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.TreeNode;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.typesafe.config.Config;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.*;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@@ -39,321 +37,227 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.Client;
+import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.index.query.IdsQueryBuilder;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
-import org.json.JSONException;
-import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.core.JsonParser;
-import java.io.Closeable;
-import java.io.Flushable;
import java.io.IOException;
-import java.io.OutputStreamWriter;
+import java.io.Serializable;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.*;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable {
- public static final String STREAMS_ID = "ElasticsearchPersistWriter";
+public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumStatusCountable, Serializable {
- public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
+ public static final String STREAMS_ID = ElasticsearchPersistWriter.class.getCanonicalName();
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
private static final NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
private static final long WAITING_DOCS_LIMIT = 10000;
- private static final int BYTES_IN_MB = 1024 * 1024;
- private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
private static final long DEFAULT_MAX_WAIT = 10000;
private static final int DEFAULT_BATCH_SIZE = 100;
+ private static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
+
private final List<String> affectedIndexes = new ArrayList<String>();
- private final ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
- //Primary lock for preventing multiple synchronous batches with the same data
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
- //Create independent locks to synchronize updates that have nothing to do with actually sending data
- private final Object countLock = new Object();
- private final Object requestLock = new Object();
-
- private ObjectMapper mapper = new StreamsJacksonMapper();
- private ElasticsearchClientManager manager;
- private ElasticsearchWriterConfiguration config;
- private Client client;
- private String parentID = null;
+
+ private final ElasticsearchClientManager manager;
+ private final ElasticsearchWriterConfiguration config;
+
private BulkRequestBuilder bulkRequest;
- private OutputStreamWriter currentWriter = null;
- private int batchSize;
- private long maxTimeBetweenFlushMs;
+
private boolean veryLargeBulk = false; // by default this setting is set to false
+ private long flushThresholdsRecords = DEFAULT_BATCH_SIZE;
+ private long flushThresholdBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
- protected Thread task;
-
- protected volatile Queue<StreamsDatum> persistQueue;
- protected final List<ListenableActionFuture<BulkResponse>> responses = Lists.newLinkedList();
-
- private volatile int currentItems = 0;
- private volatile int totalSent = 0;
- private volatile int totalSeconds = 0;
- private volatile int totalAttempted = 0;
- private volatile int totalOk = 0;
- private volatile int totalFailed = 0;
- private volatile int totalBatchCount = 0;
- private volatile int totalRecordsWritten = 0;
- private volatile long totalSizeInBytes = 0;
- private volatile long batchSizeInBytes = 0;
- private volatile int batchItemsSent = 0;
- private volatile int totalByteCount = 0;
- private volatile int byteCount = 0;
- private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
+ private long flushThresholdTime = DEFAULT_MAX_WAIT;
+ private long lastFlush = new Date().getTime();
+ private Timer timer = new Timer();
- public ElasticsearchPersistWriter() {
- Config config = StreamsConfigurator.config.getConfig("elasticsearch");
- this.config = ElasticsearchConfigurator.detectWriterConfiguration(config);
- }
- public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
- this.config = config;
- }
+ private final AtomicInteger batchesSent = new AtomicInteger(0);
+ private final AtomicInteger batchesResponded = new AtomicInteger(0);
- public void setBatchSize(int batchSize) {
- this.batchSize = batchSize;
- }
+ private final AtomicLong currentBatchItems = new AtomicLong(0);
+ private final AtomicLong currentBatchBytes = new AtomicLong(0);
- public void setVeryLargeBulk(boolean veryLargeBulk) {
- this.veryLargeBulk = veryLargeBulk;
- }
+ private final AtomicLong totalSent = new AtomicLong(0);
+ private final AtomicLong totalSeconds = new AtomicLong(0);
+ private final AtomicLong totalOk = new AtomicLong(0);
+ private final AtomicLong totalFailed = new AtomicLong(0);
+ private final AtomicLong totalSizeInBytes = new AtomicLong(0);
- public int getTotalOutstanding() {
- return this.totalSent - (this.totalFailed + this.totalOk);
- }
-
- public long getFlushThresholdSizeInBytes() {
- return flushThresholdSizeInBytes;
+ public ElasticsearchPersistWriter() {
+ this(ElasticsearchConfigurator.detectWriterConfiguration(StreamsConfigurator.config.getConfig("elasticsearch")));
}
- public int getTotalSent() {
- return totalSent;
+ public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
+ this(config, new ElasticsearchClientManager(config));
}
- public int getTotalSeconds() {
- return totalSeconds;
+ public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config, ElasticsearchClientManager manager) {
+ this.config = config;
+ this.manager = manager;
+ this.bulkRequest = this.manager.getClient().prepareBulk();
}
- public int getTotalOk() {
- return totalOk;
- }
+ public long getBatchesSent() { return this.batchesSent.get(); }
+ public long getBatchesResponded() { return batchesResponded.get(); }
- public int getTotalFailed() {
- return totalFailed;
- }
- public int getTotalBatchCount() {
- return totalBatchCount;
- }
+ public long getFlushThresholdsRecords() { return this.flushThresholdsRecords; }
+ public long getFlushThresholdBytes() { return this.flushThresholdBytes; }
+ public long getFlushThreasholdMaxTime() { return this.flushThresholdTime; }
- public long getTotalSizeInBytes() {
- return totalSizeInBytes;
- }
+ public void setFlushThresholdRecords(long val) { this.flushThresholdsRecords = val; }
+ public void setFlushThresholdBytes(long val) { this.flushThresholdBytes = val; }
+ public void setFlushThreasholdMaxTime(long val) { this.flushThresholdTime = val; }
+ public void setVeryLargeBulk(boolean veryLargeBulk) { this.veryLargeBulk = veryLargeBulk; }
- public long getBatchSizeInBytes() {
- return batchSizeInBytes;
- }
+ private long getLastFlush() { return this.lastFlush; }
- public int getBatchItemsSent() {
- return batchItemsSent;
- }
+ public long getTotalOutstanding() { return this.totalSent.get() - (this.totalFailed.get() + this.totalOk.get()); }
+ public long getTotalSent() { return this.totalSent.get(); }
+ public long getTotalOk() { return this.totalOk.get(); }
+ public long getTotalFailed() { return this.totalFailed.get(); }
+ public long getTotalSizeInBytes() { return this.totalSizeInBytes.get(); }
+ public long getTotalSeconds() { return this.totalSeconds.get(); }
+ public List<String> getAffectedIndexes() { return this.affectedIndexes; }
- public List<String> getAffectedIndexes() {
- return this.affectedIndexes;
- }
+ public boolean isConnected() { return (this.manager.getClient() != null); }
- public void setFlushThresholdSizeInBytes(long sizeInBytes) {
- this.flushThresholdSizeInBytes = sizeInBytes;
- }
+ @Override
+ public void write(StreamsDatum streamsDatum) {
+ if(streamsDatum == null || streamsDatum.getDocument() == null)
+ return;
- public long getMaxTimeBetweenFlushMs() {
- return maxTimeBetweenFlushMs;
- }
+ checkForBackOff();
- public void setMaxTimeBetweenFlushMs(long maxTimeBetweenFlushMs) {
- this.maxTimeBetweenFlushMs = maxTimeBetweenFlushMs;
+ try {
+ add(config.getIndex(), config.getType(), streamsDatum.getId(),
+ streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis()) : Long.toString(streamsDatum.getTimestamp().getMillis()),
+ convertAndAppendMetadata(streamsDatum));
+ } catch (Throwable e) {
+ LOGGER.warn("Unable to Write Datum to ElasticSearch: {}", e.getMessage());
+ }
}
- public boolean isConnected() {
- return (client != null);
- }
- @Override
- public void write(StreamsDatum streamsDatum) {
+ private String convertAndAppendMetadata(StreamsDatum streamsDatum) throws IOException {
+ Object object = streamsDatum.getDocument();
- String json;
- String id = null;
- String ts = null;
- try {
- if( streamsDatum.getId() != null ) {
- id = streamsDatum.getId();
- }
- if( streamsDatum.getTimestamp() != null ) {
- ts = Long.toString(streamsDatum.getTimestamp().getMillis());
+ String docAsJson = (object instanceof String) ? object.toString() : OBJECT_MAPPER.writeValueAsString(object);
+ if(streamsDatum.getMetadata() == null || streamsDatum.getMetadata().size() == 0)
+ return docAsJson;
+ else {
+ ObjectNode node = (ObjectNode)OBJECT_MAPPER.readTree(docAsJson);
+ try {
+ node.put("_metadata", OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsBytes(streamsDatum.getMetadata())));
}
- if (streamsDatum.getDocument() instanceof String)
- json = streamsDatum.getDocument().toString();
- else {
- json = mapper.writeValueAsString(streamsDatum.getDocument());
+ catch(Throwable e) {
+ LOGGER.warn("Unable to write metadata");
}
-
- add(config.getIndex(), config.getType(), id, ts, json);
-
- } catch (Exception e) {
- LOGGER.warn("{} {}", e.getMessage());
- e.printStackTrace();
+ return OBJECT_MAPPER.writeValueAsString(node);
}
}
public void cleanUp() {
-
try {
- flush();
- backgroundFlushTask.shutdownNow();
- } catch (IOException e) {
- e.printStackTrace();
- }
- close();
- }
- @Override
- public void close() {
- try {
// before they close, check to ensure that
- this.flush();
-
- this.lock.writeLock().lock();
-
- int count = 0;
- // We are going to give it 5 minutes.
- while (this.getTotalOutstanding() > 0 && count++ < 20 * 60 * 5) {
- for(ListenableActionFuture<BulkResponse> future : responses) {
- if(future.isDone() || future.isCancelled()) {
- BulkResponse response = future.get();
- LOGGER.warn("Found index request for {} items that was closed without notification", response.getItems().length);
- updateTotals(response, 0, 0);
- }
- }
- Thread.sleep(50);
- }
-
- if (this.getTotalOutstanding() > 0) {
- LOGGER.error("We never cleared our buffer");
- }
+ flushInternal();
+ waitToCatchUp(0, 5 * 60 * 1000);
+ refreshIndexes();
- for (String indexName : this.getAffectedIndexes()) {
- createIndexIfMissing(indexName);
+ LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding());
- if (this.veryLargeBulk) {
- LOGGER.debug("Resetting our Refresh Interval: {}", indexName);
- // They are in 'very large bulk' mode and the process is finished. We now want to turn the
- // refreshing back on.
- UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
- updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", "5s"));
+ } catch (Throwable e) {
+ // this line of code should be logically unreachable.
+ LOGGER.warn("This is unexpected: {}", e.getMessage());
+ e.printStackTrace();
+ }
+ }
- // submit to ElasticSearch
- this.manager.getClient()
- .admin()
- .indices()
- .updateSettings(updateSettingsRequest)
- .actionGet();
- }
+ private void refreshIndexes() {
+ for (String indexName : this.affectedIndexes) {
- checkIndexImplications(indexName);
+ if (this.veryLargeBulk) {
+ LOGGER.debug("Resetting our Refresh Interval: {}", indexName);
+ // They are in 'very large bulk' mode and the process is finished. We now want to turn the
+ // refreshing back on.
+ UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
+ updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", "5s"));
- LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
+ // submit to ElasticSearch
this.manager.getClient()
.admin()
.indices()
- .prepareRefresh(indexName)
- .execute()
+ .updateSettings(updateSettingsRequest)
.actionGet();
}
- LOGGER.info("Closed: Wrote[{} of {}] Failed[{}]", this.getTotalOk(), this.getTotalSent(), this.getTotalFailed());
+ checkIndexImplications(indexName);
- } catch (Exception e) {
- // this line of code should be logically unreachable.
- LOGGER.warn("This is unexpected: {}", e.getMessage());
- e.printStackTrace();
- } finally {
- this.lock.writeLock().unlock();
+ LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
+ this.manager.getClient()
+ .admin()
+ .indices()
+ .prepareRefresh(indexName)
+ .execute()
+ .actionGet();
}
}
@Override
- public void flush() throws IOException {
- flushInternal();
- }
-
- @Override
public DatumStatusCounter getDatumStatusCounter() {
DatumStatusCounter counters = new DatumStatusCounter();
- counters.incrementAttempt(this.batchItemsSent);
- counters.incrementStatus(DatumStatus.SUCCESS, this.totalOk);
- counters.incrementStatus(DatumStatus.FAIL, this.totalFailed);
+ counters.incrementStatus(DatumStatus.SUCCESS, (int)this.totalOk.get());
+ counters.incrementStatus(DatumStatus.FAIL, (int)this.totalFailed.get());
return counters;
}
- public void start() {
- backgroundFlushTask.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- LOGGER.debug("Checking to see if data needs to be flushed");
- long time = System.currentTimeMillis() - lastWrite.get();
- if (time > maxTimeBetweenFlushMs && batchItemsSent > 0) {
- LOGGER.debug("Background Flush task determined {} are waiting to be flushed. It has been {} since the last write to ES", batchItemsSent, time);
- flushInternal();
- }
- }
- }, 0, maxTimeBetweenFlushMs * 2, TimeUnit.MILLISECONDS);
- manager = new ElasticsearchClientManager(config);
- client = manager.getClient();
-
- LOGGER.info(client.toString());
- }
-
- public void flushInternal() {
- lock.writeLock().lock();
+ private synchronized void flushInternal() {
// we do not have a working bulk request, we can just exit here.
- if (this.bulkRequest == null || batchItemsSent == 0)
+ if (this.bulkRequest == null || this.currentBatchItems.get() == 0)
return;
+ // wait for one minute to catch up if it needs to
+ waitToCatchUp(5, 1 * 60 * 1000);
+
// call the flush command.
- flush(this.bulkRequest, batchItemsSent, batchSizeInBytes);
+ flush(this.bulkRequest, this.currentBatchItems.get(), this.currentBatchBytes.get());
- // null the flush request, this will be created in the 'add' function below
- this.bulkRequest = null;
+ // reset the current batch statistics
+ this.currentBatchItems.set(0);
+ this.currentBatchBytes.set(0);
- // record the proper statistics, and add it to our totals.
- this.totalSizeInBytes += this.batchSizeInBytes;
- this.totalSent += batchItemsSent;
+ // reset our bulk request builder
+ this.bulkRequest = this.manager.getClient().prepareBulk();
+ }
- // reset the current batch statistics
- this.batchSizeInBytes = 0;
- this.batchItemsSent = 0;
+ private synchronized void waitToCatchUp(int batchThreshold, int timeOutThresholdInMS) {
+ int counter = 0;
+ // If we still have 5 batches outstanding, we need to give it a minute to catch up
+ while(this.getBatchesSent() - this.getBatchesResponded() > batchThreshold && counter < timeOutThresholdInMS) {
+ try {
+ Thread.yield();
+ Thread.sleep(1);
+ timeOutThresholdInMS++;
+ } catch(InterruptedException ie) {
+ // No Operation
+ }
+ }
+ }
+ private void checkForBackOff() {
try {
- int count = 0;
if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) {
/****************************************************************************
* Author:
@@ -379,6 +283,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
****************************************************************************/
// wait for the flush to catch up. We are going to cap this at
+ int count = 0;
while (this.getTotalOutstanding() > WAITING_DOCS_LIMIT && count++ < 500)
Thread.sleep(10);
@@ -386,29 +291,20 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
LOGGER.warn("Even after back-off there are {} items still in queue.", this.getTotalOutstanding());
}
} catch (Exception e) {
- LOGGER.info("We were broken from our loop: {}", e.getMessage());
- } finally {
- lock.writeLock().unlock();
+ LOGGER.warn("We were broken from our loop: {}", e.getMessage());
}
-
- }
-
- public void add(String indexName, String type, String json) {
- add(indexName, type, null, null, json);
- }
-
- public void add(String indexName, String type, String id, String json) {
- add(indexName, type, id, null, json);
}
- public void add(String indexName, String type, String id, String ts, String json)
- {
- IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(manager.getClient());
+ public void add(String indexName, String type, String id, String ts, String json) {
- indexRequestBuilder.setIndex(indexName);
- indexRequestBuilder.setType(type);
+ // make sure that these are not null
+ Preconditions.checkNotNull(indexName);
+ Preconditions.checkNotNull(type);
+ Preconditions.checkNotNull(json);
- indexRequestBuilder.setSource(json);
+ IndexRequestBuilder indexRequestBuilder = manager.getClient()
+ .prepareIndex(indexName, type)
+ .setSource(json);
// / They didn't specify an ID, so we will create one for them.
if(id != null)
@@ -417,114 +313,109 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
if(ts != null)
indexRequestBuilder.setTimestamp(ts);
- // If there is a parentID that is associated with this bulk, then we are
- // going to have to parse the raw JSON and attempt to dereference
- // what the parent document should be
- if (parentID != null) {
- try {
- // The JSONObject constructor can throw an exception, it is called
- // out explicitly here so we can catch it.
- indexRequestBuilder.setParent(new JSONObject(json).getString(parentID));
- }
- catch(JSONException e)
- {
- LOGGER.warn("Malformed JSON, cannot grab parentID: {}@{}[{}]: {}", id, indexName, type, e.getMessage());
- totalFailed++;
- }
- }
add(indexRequestBuilder.request());
}
- public void add(UpdateRequest updateRequest) {
- Preconditions.checkNotNull(updateRequest);
- lock.writeLock().lock();
+ /**
+ * This function is trashed... needs to be fixed.
+ *
+ private synchronized void add(UpdateRequest request) {
+ Preconditions.checkNotNull(request);
checkAndCreateBulkRequest();
- checkIndexImplications(updateRequest.index());
- bulkRequest.add(updateRequest);
+
+ checkIndexImplications(request.index());
+
+ bulkRequest.add(request);
try {
Optional<Integer> size = Objects.firstNonNull(
- Optional.fromNullable(updateRequest.doc().source().length()),
- Optional.fromNullable(updateRequest.script().length()));
+ Optional.fromNullable(request.doc().source().length()),
+ Optional.fromNullable(request.script().length()));
trackItemAndBytesWritten(size.get().longValue());
} catch (NullPointerException x) {
trackItemAndBytesWritten(1000);
- } finally {
- lock.writeLock().unlock();
}
}
+ */
- public void add(IndexRequest indexRequest) {
- lock.writeLock().lock();
- checkAndCreateBulkRequest();
- checkIndexImplications(indexRequest.index());
- bulkRequest.add(indexRequest);
- try {
- trackItemAndBytesWritten(indexRequest.source().length());
- } catch (NullPointerException x) {
- LOGGER.warn("NPE adding/sizing indexrequest");
- } finally {
- lock.writeLock().unlock();
+ protected void add(IndexRequest request) {
+
+ Preconditions.checkNotNull(request);
+ Preconditions.checkNotNull(request.index());
+
+ // If our queue is larger than our flush threshold, then we should flush the queue.
+ synchronized (this) {
+ checkIndexImplications(request.index());
+
+ bulkRequest.add(request);
+
+ this.currentBatchBytes.addAndGet(request.source().length());
+ this.currentBatchItems.incrementAndGet();
+
+ checkForFlush();
}
}
- private void trackItemAndBytesWritten(long sizeInBytes)
- {
- currentItems++;
- batchItemsSent++;
- batchSizeInBytes += sizeInBytes;
-
- // If our queue is larger than our flush threashold, then we should flush the queue.
- if( (batchSizeInBytes > flushThresholdSizeInBytes) ||
- (currentItems >= batchSize) ) {
- flushInternal();
- this.currentItems = 0;
+ private void checkForFlush() {
+ synchronized (this) {
+ if (this.currentBatchBytes.get() >= this.flushThresholdBytes ||
+ this.currentBatchItems.get() >= this.flushThresholdsRecords ||
+ new Date().getTime() - this.lastFlush >= this.flushThresholdTime) {
+ // We should flush
+ flushInternal();
+ }
}
}
- private void checkIndexImplications(String indexName)
- {
+ private void checkIndexImplications(String indexName) {
+ // We need this to be safe across all writers that are currently being executed
+ synchronized (ElasticsearchPersistWriter.class) {
- // check to see if we have seen this index before.
- if(this.affectedIndexes.contains(indexName))
- return;
+ // this will be common if we have already verified the index.
+ if (this.affectedIndexes.contains(indexName))
+ return;
- // we haven't log this index.
- this.affectedIndexes.add(indexName);
- // Check to see if we are in 'veryLargeBulk' mode
- // if we aren't, exit early
- if(!this.veryLargeBulk)
- return;
+ // create the index if it is missing
+ createIndexIfMissing(indexName);
+ // we haven't log this index.
+ this.affectedIndexes.add(indexName);
- // They are in 'very large bulk' mode we want to turn off refreshing the index.
+ // Check to see if we are in 'veryLargeBulk' mode
+ // if we aren't, exit early
+ if (this.veryLargeBulk) {
- // Create a request then add the setting to tell it to stop refreshing the interval
- UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
- updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1));
+ // They are in 'very large bulk' mode we want to turn off refreshing the index.
+ // Create a request then add the setting to tell it to stop refreshing the interval
+ UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
+ updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1));
- // submit to ElasticSearch
- this.manager.getClient()
- .admin()
- .indices()
- .updateSettings(updateSettingsRequest)
- .actionGet();
+ // submit to ElasticSearch
+ this.manager.getClient()
+ .admin()
+ .indices()
+ .updateSettings(updateSettingsRequest)
+ .actionGet();
+ }
+ }
}
public void createIndexIfMissing(String indexName) {
+ // Synchronize this on a static class level
if (!this.manager.getClient()
.admin()
.indices()
.exists(new IndicesExistsRequest(indexName))
.actionGet()
- .isExists()) {
+ .isExists())
+ {
// It does not exist... So we are going to need to create the index.
// we are going to assume that the 'templates' that we have loaded into
// elasticsearch are sufficient to ensure the index is being created properly.
CreateIndexResponse response = this.manager.getClient().admin().indices().create(new CreateIndexRequest(indexName)).actionGet();
if (response.isAcknowledged()) {
- LOGGER.info("Index {} did not exist. The index was automatically created from the stored ElasticSearch Templates.", indexName);
+ LOGGER.info("Index Created: {}", indexName);
} else {
LOGGER.error("Index {} did not exist. While attempting to create the index from stored ElasticSearch Templates we were unable to get an acknowledgement.", indexName);
LOGGER.error("Error Message: {}", response.toString());
@@ -533,22 +424,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
}
}
- public void add(String indexName, String type, Map<String, Object> toImport) {
- for (String id : toImport.keySet())
- add(indexName, type, id, (String) toImport.get(id));
- }
-
- private void checkThenAddBatch(String index, String type, Map<String, String> workingBatch) {
- Set<String> invalidIDs = checkIds(workingBatch.keySet(), index, type);
-
- for (String toAddId : workingBatch.keySet())
- if (!invalidIDs.contains(toAddId))
- add(index, type, toAddId, workingBatch.get(toAddId));
-
- LOGGER.info("Adding Batch: {} -> {}", workingBatch.size(), invalidIDs.size());
- }
-
-
+ /**
+ *
private Set<String> checkIds(Set<String> input, String index, String type) {
IdsQueryBuilder idsFilterBuilder = new IdsQueryBuilder();
@@ -575,102 +452,90 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
return toReturn;
}
+ */
- @Override
public void prepare(Object configurationObject) {
- mapper = StreamsJacksonMapper.getInstance();
- veryLargeBulk = config.getBulk() == null ? Boolean.FALSE : config.getBulk();
- batchSize = config.getBatchSize() == null ? DEFAULT_BATCH_SIZE : (int)(config.getBatchSize().longValue());
- maxTimeBetweenFlushMs = config.getMaxTimeBetweenFlushMs() == null ? DEFAULT_MAX_WAIT : config.getMaxTimeBetweenFlushMs().longValue();
- start();
- }
+ this.veryLargeBulk = config.getBulk() == null ?
+ Boolean.FALSE :
+ config.getBulk();
- /**
- * This method is to ONLY be called by flushInternal otherwise the counts will be off.
- * @param bulkRequest
- * @param thisSent
- * @param thisSizeInBytes
- */
- private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final Long thisSizeInBytes) {
- LOGGER.debug("Attempting to write {} items to ES", thisSent);
- final ListenableActionFuture<BulkResponse> responseFuture = bulkRequest.execute();
- this.addResponseFuture(responseFuture);
- responseFuture.addListener(new ActionListener<BulkResponse>() {
- @Override
- public void onResponse(BulkResponse bulkItemResponses) {
- lastWrite.set(System.currentTimeMillis());
- removeResponseFuture(responseFuture);
-
- updateTotals(bulkItemResponses, thisSent, thisSizeInBytes);
- }
+ this.flushThresholdsRecords = config.getBatchSize() == null ?
+ DEFAULT_BATCH_SIZE :
+ (int)(config.getBatchSize().longValue());
- @Override
- public void onFailure(Throwable e) {
- LOGGER.error("Error bulk loading: {}", e.getMessage());
- removeResponseFuture(responseFuture);
- e.printStackTrace();
- }
- });
- }
+ this.flushThresholdTime = config.getMaxTimeBetweenFlushMs() != null && config.getMaxTimeBetweenFlushMs() > 0 ?
+ config.getMaxTimeBetweenFlushMs() :
+ DEFAULT_MAX_WAIT;
- private void updateTotals(BulkResponse bulkItemResponses, Integer thisSent, double thisSizeInBytes) {
- if (bulkItemResponses.hasFailures())
- LOGGER.warn("Bulk Uploading had totalFailed: " + bulkItemResponses.buildFailureMessage());
+ this.flushThresholdBytes = config.getBatchBytes() == null ?
+ DEFAULT_BULK_FLUSH_THRESHOLD :
+ config.getBatchBytes();
- long thisFailed = 0;
- long thisOk = 0;
- long thisMillis = bulkItemResponses.getTookInMillis();
+ timer.scheduleAtFixedRate(new TimerTask() {
+ public void run() {
+ checkForFlush();
+ }
+ }, this.flushThresholdTime, this.flushThresholdTime);
- // keep track of the number of totalFailed and items that we have totalOk.
- for (BulkItemResponse resp : bulkItemResponses.getItems()) {
- if (resp.isFailed())
- thisFailed++;
- else
- thisOk++;
- }
+ }
- synchronized(countLock) {
- totalAttempted += thisSent;
- totalOk += thisOk;
- totalFailed += thisFailed;
- totalSeconds += (thisMillis / 1000);
- lock.writeLock().unlock();
- }
+ private void flush(final BulkRequestBuilder bulkRequest, final Long sent, final Long sizeInBytes) {
+ LOGGER.debug("Writing to ElasticSearch: Items[{}] Size[{} mb]", sent, MEGABYTE_FORMAT.format(sizeInBytes / (double) (1024 * 1024)));
- if (thisSent != (thisOk + thisFailed))
- LOGGER.error("We sent more items than this");
- LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]",
- MEGABYTE_FORMAT.format(thisSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(thisOk), NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis),
- MEGABYTE_FORMAT.format((double) totalSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding()));
+ // record the last time we flushed the index
+ this.lastFlush = new Date().getTime();
- }
+ // add the totals
+ this.totalSent.addAndGet(sent);
+ // add the total number of batches sent
+ this.batchesSent.incrementAndGet();
- private void checkAndCreateBulkRequest() {
- // Synchronize to ensure that we don't lose any records
- lock.writeLock().lock();
try {
- if (bulkRequest == null)
- bulkRequest = this.manager.getClient().prepareBulk();
- } finally {
- lock.writeLock().unlock();
- }
- }
+ bulkRequest.execute().addListener(new ActionListener<BulkResponse>() {
+ public void onResponse(BulkResponse bulkItemResponses) {
+ batchesResponded.incrementAndGet();
+ updateTotals(bulkItemResponses, sent, sizeInBytes);
+ }
- //Locking on a separate object than the writer as these objects are intended to be handled separately
- private void addResponseFuture(ListenableActionFuture<BulkResponse> future) {
- synchronized (requestLock) {
- this.responses.add(future);
+ public void onFailure(Throwable throwable) {
+ batchesResponded.incrementAndGet();
+ throwable.printStackTrace();
+ }
+ });
+ }
+ catch(Throwable e) {
+ LOGGER.error("There was an error sending the batch: {}", e.getMessage());
}
}
- //Locking on a separate object than the writer as these objects are intended to be handled separately
- private void removeResponseFuture(ListenableActionFuture<BulkResponse> future) {
- synchronized(requestLock) {
- this.responses.remove(future);
+ private void updateTotals(final BulkResponse bulkItemResponses, final Long sent, final Long sizeInBytes) {
+ long failed = 0;
+ long passed = 0;
+ long millis = bulkItemResponses.getTookInMillis();
+
+ // keep track of the number of totalFailed and items that we have totalOk.
+ for (BulkItemResponse resp : bulkItemResponses.getItems()) {
+ if (resp == null || resp.isFailed())
+ failed++;
+ else
+ passed++;
}
- }
-}
+ if (failed > 0)
+ LOGGER.warn("Bulk Uploading had {} failures of {}", failed, sent);
+
+ this.totalOk.addAndGet(passed);
+ this.totalFailed.addAndGet(failed);
+ this.totalSeconds.addAndGet(millis / 1000);
+ this.totalSizeInBytes.addAndGet(sizeInBytes);
+ if (sent != (passed + failed))
+ LOGGER.error("Count MisMatch: Sent[{}] Passed[{}] Failed[{}]", sent, passed, failed);
+
+ LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]",
+ MEGABYTE_FORMAT.format(sizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(passed), NUMBER_FORMAT.format(failed), NUMBER_FORMAT.format(millis),
+ MEGABYTE_FORMAT.format((double) totalSizeInBytes.get() / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding()));
+ }
+}