You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/07/02 18:13:52 UTC

[01/12] git commit: STREAMS-113 | Added ability to convert a Twitter User object an an Activity object.

Repository: incubator-streams
Updated Branches:
  refs/heads/instagram a3f443fe7 -> 815ce2abd


STREAMS-113 | Added ability to convert a Twitter User object an an Activity object.


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/bc9fe492
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/bc9fe492
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/bc9fe492

Branch: refs/heads/instagram
Commit: bc9fe49270e9b64e2a8deaed7924b2ab4ff22458
Parents: 829f805
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Mon Jun 23 14:07:09 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Mon Jun 23 14:07:09 2014 -0500

----------------------------------------------------------------------
 .../twitter/processor/TwitterTypeConverter.java | 20 +++++-
 .../TwitterJsonActivitySerializer.java          | 21 ++++++
 .../TwitterJsonUserActivitySerializer.java      | 72 ++++++++++++++++++++
 .../serializer/util/TwitterActivityUtil.java    | 22 ++++++
 4 files changed, 134 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bc9fe492/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
index e2119a5..0737e39 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.streams.twitter.processor;
 
 import com.fasterxml.jackson.core.JsonParseException;
@@ -158,7 +176,7 @@ public class TwitterTypeConverter implements StreamsProcessor {
                 node = (ObjectNode)mapper.valueToTree(item);
 
                 // since data is coming from outside provider, we don't know what type the events are
-                Class inClass = TwitterEventClassifier.detectClass((String)item);
+                Class inClass = TwitterEventClassifier.detectClass(item.toString());
 
                 Object out = convert(node, inClass, outClass);
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bc9fe492/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
index cae2ff8..5b5be96 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.streams.twitter.serializer;
 
 import org.apache.commons.lang.NotImplementedException;
@@ -20,6 +38,7 @@ public class TwitterJsonActivitySerializer implements ActivitySerializer<String>
     TwitterJsonTweetActivitySerializer tweetActivitySerializer = new TwitterJsonTweetActivitySerializer();
     TwitterJsonRetweetActivitySerializer retweetActivitySerializer = new TwitterJsonRetweetActivitySerializer();
     TwitterJsonDeleteActivitySerializer deleteActivitySerializer = new TwitterJsonDeleteActivitySerializer();
+    TwitterJsonUserActivitySerializer userActivitySerializer = new TwitterJsonUserActivitySerializer();
 
     @Override
     public String serializationFormat() {
@@ -43,6 +62,8 @@ public class TwitterJsonActivitySerializer implements ActivitySerializer<String>
             activity = retweetActivitySerializer.deserialize(serialized);
         else if( documentSubType == Delete.class )
             activity = deleteActivitySerializer.deserialize(serialized);
+        else if( documentSubType == User.class )
+            activity = userActivitySerializer.deserialize(serialized);
         else throw new ActivitySerializerException("unrecognized type");
 
         return activity;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bc9fe492/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java
new file mode 100644
index 0000000..2ae5355
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.serializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.User;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.updateActivity;
+
+public class TwitterJsonUserActivitySerializer implements ActivitySerializer<String>, Serializable {
+
+    public TwitterJsonUserActivitySerializer() {}
+
+    @Override
+    public String serializationFormat() {
+        return null;
+    }
+
+    @Override
+    public String serialize(Activity deserialized) throws ActivitySerializerException {
+        return null;
+    }
+
+    @Override
+    public Activity deserialize(String event) throws ActivitySerializerException {
+
+        ObjectMapper mapper = new StreamsJacksonMapper();
+        User user = null;
+        try {
+            user = mapper.readValue(event, User.class);
+        } catch (JsonProcessingException e) {
+            e.printStackTrace();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        Activity activity = new Activity();
+        updateActivity(user, activity);
+
+        return activity;
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<String> serializedList) {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bc9fe492/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java
index feb6978..56b7005 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/util/TwitterActivityUtil.java
@@ -79,6 +79,17 @@ public class TwitterActivityUtil {
     }
 
     /**
+     * Updates the given Activity object with the values from the User
+     * @param user the object to use as the source
+     * @param activity the target of the updates.  Will receive all values from the tweet.
+     * @throws ActivitySerializerException
+     */
+    public static void updateActivity(User user, Activity activity) throws ActivitySerializerException {
+        activity.setActor(buildActor(user));
+        activity.setId(null);
+    }
+
+    /**
      * Updates the activity for a delete event
      * @param delete the delete event
      * @param activity the Activity object to update
@@ -163,6 +174,17 @@ public class TwitterActivityUtil {
     public static Actor buildActor(Tweet tweet) {
         Actor actor = new Actor();
         User user = tweet.getUser();
+
+        return buildActor(user);
+    }
+
+    /**
+     * Builds the activity {@link org.apache.streams.pojo.json.Actor} object from the User
+     * @param user the object to use as the source
+     * @return a valid Actor populated from the Tweet
+     */
+    public static Actor buildActor(User user) {
+        Actor actor = new Actor();
         actor.setId(formatId(
                 Optional.fromNullable(
                         user.getIdStr())


[07/12] git commit: Fixed build issues with PR#45

Posted by mf...@apache.org.
Fixed build issues with PR#45


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d91c4a44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d91c4a44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d91c4a44

Branch: refs/heads/instagram
Commit: d91c4a444cba9bc805e3b04fd1914e86aa8c194f
Parents: 7d65fb2
Author: mfranklin <mf...@apache.org>
Authored: Mon Jun 30 08:45:55 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jun 30 08:45:55 2014 -0400

----------------------------------------------------------------------
 .../ElasticsearchPersistUpdater.java            | 35 ------------
 .../ElasticsearchPersistWriterTask.java         | 56 --------------------
 .../ElasticsearchWriterConfiguration.json       |  5 ++
 3 files changed, 5 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d91c4a44/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index 6982862..b2e7556 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -36,7 +36,6 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.client.Client;
@@ -44,8 +43,6 @@ import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.index.query.IdsQueryBuilder;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
-import org.json.JSONException;
-import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -103,44 +100,16 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
         this.batchSize = batchSize;
     }
 
-    public void setVeryLargeBulk(boolean veryLargeBulk) {
-        this.veryLargeBulk = veryLargeBulk;
-    }
-
     private final List<String> affectedIndexes = new ArrayList<String>();
 
-    public int getTotalOutstanding() {
-        return this.totalSent - (this.totalFailed + this.totalOk);
-    }
-
     public long getFlushThresholdSizeInBytes() {
         return flushThresholdSizeInBytes;
     }
 
-    public int getTotalSent() {
-        return totalSent;
-    }
-
-    public int getTotalSeconds() {
-        return totalSeconds;
-    }
-
-    public int getTotalOk() {
-        return totalOk;
-    }
-
-    public int getTotalFailed() {
-        return totalFailed;
-    }
-
     public int getTotalBatchCount() {
         return totalBatchCount;
     }
 
-    public long getTotalSizeInBytes() {
-        return totalSizeInBytes;
-    }
-
     public long getBatchSizeInBytes() {
         return batchSizeInBytes;
     }
@@ -149,10 +118,6 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
         return batchItemsSent;
     }
 
-    public List<String> getAffectedIndexes() {
-        return this.affectedIndexes;
-    }
-
     public void setFlushThresholdSizeInBytes(long sizeInBytes) {
         this.flushThresholdSizeInBytes = sizeInBytes;
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d91c4a44/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
deleted file mode 100644
index 824f1ac..0000000
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.elasticsearch;
-
-import org.apache.streams.core.StreamsDatum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Random;
-
-public class ElasticsearchPersistWriterTask implements Runnable {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriterTask.class);
-
-    private ElasticsearchPersistWriter writer;
-
-    public ElasticsearchPersistWriterTask(ElasticsearchPersistWriter writer) {
-        this.writer = writer;
-    }
-
-    @Override
-    public void run() {
-
-        while(true) {
-            if( writer.persistQueue.peek() != null ) {
-                try {
-                    StreamsDatum entry = writer.persistQueue.remove();
-                    writer.write(entry);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-            try {
-                Thread.sleep(new Random().nextInt(1));
-            } catch (InterruptedException e) {}
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d91c4a44/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index b107be6..a38ff85 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -24,6 +24,11 @@
             "description": "Item Count before flush",
             "default": 100
         },
+		"batchBytes": {
+			"type": "integer",
+			"description": "Number of bytes before flush",
+			"default": 5242880
+		},
 		"maxTimeBetweenFlushMs": {
 			"type": "integer"
 		}


[11/12] git commit: Merge PR#42 from robdouglas 'rdouglas/STREAMS-113'

Posted by mf...@apache.org.
Merge PR#42 from robdouglas 'rdouglas/STREAMS-113'


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c1246d96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c1246d96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c1246d96

Branch: refs/heads/instagram
Commit: c1246d963822aa9c2f8fd9408b0b781935535596
Parents: ba96627 42debdf
Author: mfranklin <mf...@apache.org>
Authored: Wed Jul 2 09:21:50 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Wed Jul 2 09:21:50 2014 -0400

----------------------------------------------------------------------
 .../twitter/processor/TwitterTypeConverter.java |  2 +-
 .../TwitterJsonActivitySerializer.java          |  3 +
 .../TwitterJsonUserActivitySerializer.java      | 72 ++++++++++++++++++++
 .../serializer/util/TwitterActivityUtil.java    | 22 ++++++
 4 files changed, 98 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[10/12] git commit: Added default initial time for Sysomos

Posted by mf...@apache.org.
Added default initial time for Sysomos


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ba96627c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ba96627c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ba96627c

Branch: refs/heads/instagram
Commit: ba96627ca1eb5a52b9c44cfd0a66a7cb58539503
Parents: 31888f9
Author: mfranklin <mf...@apache.org>
Authored: Mon Jun 30 11:03:38 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jun 30 11:03:38 2014 -0400

----------------------------------------------------------------------
 .../apache/streams/sysomos/provider/SysomosHeartbeatStream.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ba96627c/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
index 946572a..8ad5ed7 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
@@ -49,7 +49,7 @@ public class SysomosHeartbeatStream implements Runnable {
     private boolean enabled = true;
 
     public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId) {
-        this(provider, heartbeatId, OperatingMode.DATE);
+        this(provider, heartbeatId, null, DateTime.now());
     }
 
     public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId, DateTime beforeTime, DateTime afterTime) {


[05/12] git commit: STREAMS-119 | Added support for isRunning

Posted by mf...@apache.org.
STREAMS-119 | Added support for isRunning


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/da4d9ed8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/da4d9ed8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/da4d9ed8

Branch: refs/heads/instagram
Commit: da4d9ed801b212d6e6f1f351b609842f14c79bba
Parents: 34c95a6
Author: mfranklin <mf...@apache.org>
Authored: Fri Jun 27 14:52:27 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Fri Jun 27 14:52:27 2014 -0400

----------------------------------------------------------------------
 .../local/tasks/StreamsProviderTask.java        | 51 +++++++++-------
 .../local/builders/LocalStreamBuilderTest.java  | 52 +++++++++++++---
 .../local/test/processors/SlowProcessor.java    | 50 ++++++++++++++++
 .../test/providers/EmptyResultSetProvider.java  |  2 +-
 .../test/providers/NumericMessageProvider.java  | 63 ++++----------------
 .../test/component/FileReaderProvider.java      | 52 ++++------------
 6 files changed, 151 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/da4d9ed8/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index f384380..ed95cdd 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -35,7 +35,6 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
 
     private final static Logger LOGGER = LoggerFactory.getLogger(StreamsProviderTask.class);
 
-    @Override
     public DatumStatusCounter getDatumStatusCounter() {
         return this.statusCounter;
     }
@@ -51,12 +50,13 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
     private static final int END = 1;
 
     private StreamsProvider provider;
-    private AtomicBoolean keepRunning;
+    private final AtomicBoolean keepRunning = new AtomicBoolean(true);
+    private final AtomicBoolean flushing = new AtomicBoolean(false);
+    private final AtomicBoolean started = new AtomicBoolean(false);
     private Type type;
     private BigInteger sequence;
     private DateTime[] dateRange;
     private Map<String, Object> config;
-    private AtomicBoolean isRunning;
 
     private int timeout;
     private int zeros = 0;
@@ -72,8 +72,6 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
             this.type = Type.PERPETUAL;
         else
             this.type = Type.READ_CURRENT;
-        this.keepRunning = new AtomicBoolean(true);
-        this.isRunning = new AtomicBoolean(true);
         this.timeout = DEFAULT_TIMEOUT_MS;
     }
 
@@ -86,8 +84,6 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
         this.provider = provider;
         this.type = Type.READ_NEW;
         this.sequence = sequence;
-        this.keepRunning = new AtomicBoolean(true);
-        this.isRunning = new AtomicBoolean(true);
         this.timeout = DEFAULT_TIMEOUT_MS;
     }
 
@@ -103,8 +99,6 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
         this.dateRange = new DateTime[2];
         this.dateRange[START] = start;
         this.dateRange[END] = end;
-        this.keepRunning = new AtomicBoolean(true);
-        this.isRunning = new AtomicBoolean(true);
         this.timeout = DEFAULT_TIMEOUT_MS;
     }
 
@@ -114,6 +108,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
 
     @Override
     public void stopTask() {
+        LOGGER.debug("Stopping Provider Task for {}", this.provider.getClass().getSimpleName());
         this.keepRunning.set(false);
     }
 
@@ -133,12 +128,12 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
         try {
             this.provider.prepare(this.config); //TODO allow for configuration objects
             StreamsResultSet resultSet = null;
-            this.isRunning.set(true);
             long maxZeros = timeout / DEFAULT_SLEEP_TIME_MS;
             switch(this.type) {
                 case PERPETUAL: {
                     provider.startStream();
-                    while(this.keepRunning.get()) {
+                    this.started.set(true);
+                    while(this.isRunning()) {
                         try {
                             resultSet = provider.readCurrent();
                             if( resultSet.size() == 0 )
@@ -152,37 +147,51 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
                                 this.keepRunning.set(false);
                             Thread.sleep(DEFAULT_SLEEP_TIME_MS);
                         } catch (InterruptedException e) {
+                            LOGGER.warn("Thread interrupted");
                             this.keepRunning.set(false);
                         }
                     }
                 }
                     break;
-                case READ_CURRENT: resultSet = this.provider.readCurrent();
+                case READ_CURRENT:
+                    resultSet = this.provider.readCurrent();
+                    this.started.set(true);
                     break;
-                case READ_NEW: resultSet = this.provider.readNew(this.sequence);
+                case READ_NEW:
+                    resultSet = this.provider.readNew(this.sequence);
+                    this.started.set(true);
                     break;
-                case READ_RANGE: resultSet = this.provider.readRange(this.dateRange[START], this.dateRange[END]);
+                case READ_RANGE:
+                    resultSet = this.provider.readRange(this.dateRange[START], this.dateRange[END]);
+                    this.started.set(true);
                     break;
                 default: throw new RuntimeException("Type has not been added to StreamsProviderTask.");
             }
             flushResults(resultSet);
 
         } catch( Exception e ) {
-            e.printStackTrace();
-        } finally
-        {
+            LOGGER.error("Error in processing provider stream", e);
+        } finally {
+            LOGGER.debug("Complete Provider Task execution for {}", this.provider.getClass().getSimpleName());
+            this.keepRunning.set(false);
             this.provider.cleanUp();
-            this.isRunning.set(false);
         }
     }
 
     @Override
     public boolean isRunning() {
-        return this.isRunning.get();
+        //We want to make sure that we never return false if it is flushing, regardless of the state of the provider
+        //or whether we have been told to shut down.  If someone really wants us to shut down, they will interrupt the
+        //thread and force us to shutdown.  We also want to make sure we have had the opportunity to run before the
+        //runtime kills us.
+        return !this.started.get() || this.flushing.get() || (this.provider.isRunning() && this.keepRunning.get());
     }
 
     public void flushResults(StreamsResultSet resultSet) {
-        for(StreamsDatum datum : resultSet) {
+        Queue<StreamsDatum> queue = resultSet.getQueue();
+        this.flushing.set(true);
+        while(!queue.isEmpty()) {
+            StreamsDatum datum = queue.poll();
             if(!this.keepRunning.get()) {
                 break;
             }
@@ -198,10 +207,12 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
                 try {
                     Thread.sleep(DEFAULT_SLEEP_TIME_MS);
                 } catch (InterruptedException e) {
+                    LOGGER.warn("Thread interrupted");
                     this.keepRunning.set(false);
                 }
             }
         }
+        this.flushing.set(false);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/da4d9ed8/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
index ef05eb6..5a40865 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
@@ -26,6 +26,7 @@ import org.apache.streams.core.test.processors.PassthroughDatumCounterProcessor;
 import org.apache.streams.core.test.providers.NumericMessageProvider;
 import org.apache.streams.core.test.writer.SystemOutWriter;
 import org.apache.streams.local.tasks.StreamsTask;
+import org.apache.streams.local.test.processors.SlowProcessor;
 import org.apache.streams.local.test.providers.EmptyResultSetProvider;
 import org.junit.Before;
 import org.junit.Test;
@@ -36,6 +37,10 @@ import java.io.PrintStream;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Scanner;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.*;
 import static org.hamcrest.Matchers.*;
@@ -185,17 +190,24 @@ public class LocalStreamBuilderTest {
     }
 
     @Test
-    public void testDefaultProviderTimeout() {
-        long start = System.currentTimeMillis();
-        StreamBuilder builder = new LocalStreamBuilder();
-        builder.newPerpetualStream("prov1", new EmptyResultSetProvider())
-                .addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor(), 1, "prov1")
-                .addStreamsProcessor("proc2", new PassthroughDatumCounterProcessor(), 1, "proc1")
+    public void testSlowProcessorBranch() {
+        int numDatums = 30;
+        int timeout = 2000;
+        Map<String, Object> config = Maps.newHashMap();
+        config.put(LocalStreamBuilder.TIMEOUT_KEY, timeout);
+        StreamBuilder builder = new LocalStreamBuilder(config);
+        builder.newReadCurrentStream("prov1", new NumericMessageProvider(numDatums))
+                .addStreamsProcessor("proc1", new SlowProcessor(), 1, "prov1")
                 .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1");
         builder.start();
-        long end = System.currentTimeMillis();
-        //We care mostly that it doesn't terminate too early.  With thread shutdowns, etc, the actual time is indeterminate.  Just make sure there is an upper bound
-        assertThat((int)(end - start), is(allOf(greaterThanOrEqualTo(StreamsTask.DEFAULT_TIMEOUT_MS), lessThanOrEqualTo(2 * (StreamsTask.DEFAULT_TIMEOUT_MS)))));
+        int count = 0;
+        Scanner scanner = new Scanner(new ByteArrayInputStream(out.toByteArray()));
+        while(scanner.hasNextLine()) {
+            ++count;
+            scanner.nextLine();
+        }
+        assertThat(count, greaterThan(numDatums)); // using > because number of lines in system.out is non-deterministic
+
     }
 
     @Test
@@ -214,4 +226,26 @@ public class LocalStreamBuilderTest {
         //We care mostly that it doesn't terminate too early.  With thread shutdowns, etc, the actual time is indeterminate.  Just make sure there is an upper bound
         assertThat((int)(end - start), is(allOf(greaterThanOrEqualTo(timeout), lessThanOrEqualTo(4 * timeout))));
     }
+
+    @Test
+    public void ensureShutdownWithBlockedQueue() throws InterruptedException {
+        ExecutorService service = Executors.newSingleThreadExecutor();
+        int before = Thread.activeCount();
+        final StreamBuilder builder = new LocalStreamBuilder(Queues.<StreamsDatum>newLinkedBlockingQueue(1));
+        builder.newPerpetualStream("prov1", new NumericMessageProvider(30))
+                .addStreamsProcessor("proc1", new SlowProcessor(), 1, "prov1")
+                .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1");
+        service.submit(new Runnable(){
+            @Override
+            public void run() {
+                builder.start();
+            }
+        });
+        //Let streams spin up threads and start to process
+        Thread.sleep(500);
+        builder.stop();
+        service.shutdownNow();
+        service.awaitTermination(1000, TimeUnit.MILLISECONDS);
+        assertThat(Thread.activeCount(), is(equalTo(before)));
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/da4d9ed8/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/SlowProcessor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/SlowProcessor.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/SlowProcessor.java
new file mode 100644
index 0000000..93ec060
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/SlowProcessor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.local.test.processors;
+
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+
+import java.util.List;
+
+/**
+ */
+public class SlowProcessor  implements StreamsProcessor {
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+        return Lists.newArrayList(entry);
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/da4d9ed8/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
index fe60295..28b9907 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
@@ -53,7 +53,7 @@ public class EmptyResultSetProvider implements StreamsProvider {
 
     @Override
     public boolean isRunning() {
-        return false;
+        return true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/da4d9ed8/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
index 159355d..379459f 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
@@ -18,6 +18,7 @@
 
 package org.apache.streams.core.test.providers;
 
+import com.google.common.collect.Queues;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
@@ -25,7 +26,9 @@ import org.joda.time.DateTime;
 
 import java.math.BigInteger;
 import java.util.Iterator;
+import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Test StreamsProvider that sends out StreamsDatums numbered from 0 to numMessages.
@@ -45,17 +48,17 @@ public class NumericMessageProvider implements StreamsProvider {
 
     @Override
     public StreamsResultSet readCurrent() {
-        return new ResultSet();
+        return new StreamsResultSet(constructQueue());
     }
 
     @Override
     public StreamsResultSet readNew(BigInteger sequence) {
-        return new ResultSet();
+        return new StreamsResultSet(constructQueue());
     }
 
     @Override
     public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return new ResultSet();
+        return new StreamsResultSet(constructQueue());
     }
 
     @Override
@@ -73,53 +76,13 @@ public class NumericMessageProvider implements StreamsProvider {
 
     }
 
-
-    private class ResultSet extends StreamsResultSet {
-
-        private ResultSet() {
-            super(new ConcurrentLinkedQueue<StreamsDatum>());
-        }
-
-//        @Override
-//        public long getStartTime() {
-//            return 0;
-//        }
-//
-//        @Override
-//        public long getEndTime() {
-//            return 0;
-//        }
-//
-//        @Override
-//        public String getSourceId() {
-//            return null;
-//        }
-//
-//        @Override
-//        public BigInteger getMaxSequence() {
-//            return null;
-//        }
-
-        @Override
-        public Iterator<StreamsDatum> iterator() {
-            return new Iterator<StreamsDatum>() {
-                private int i = 0;
-
-                @Override
-                public boolean hasNext() {
-                    return i < numMessages;
-                }
-
-                @Override
-                public StreamsDatum next() {
-                    return new StreamsDatum(i++);
-                }
-
-                @Override
-                public void remove() {
-
-                }
-            };
+    private Queue<StreamsDatum> constructQueue() {
+        Queue<StreamsDatum> datums = Queues.newArrayBlockingQueue(numMessages);
+        for(int i=0;i<numMessages;i++) {
+            datums.add(new StreamsDatum(i));
         }
+        return datums;
     }
 }
+
+

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/da4d9ed8/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
index 9c1ae97..1e0eb88 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
@@ -18,17 +18,13 @@
 
 package org.apache.streams.test.component;
 
+import com.google.common.collect.Queues;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.joda.time.DateTime;
 
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
 import java.math.BigInteger;
-import java.sql.ResultSet;
-import java.util.Iterator;
 import java.util.Queue;
 import java.util.Scanner;
 
@@ -42,7 +38,6 @@ import java.util.Scanner;
 public class FileReaderProvider implements StreamsProvider {
 
     private String fileName;
-    private InputStream inStream;
     private Scanner scanner;
     private StreamsDatumConverter converter;
 
@@ -58,7 +53,7 @@ public class FileReaderProvider implements StreamsProvider {
 
     @Override
     public StreamsResultSet readCurrent() {
-        return new ResultSet();
+        return new StreamsResultSet(constructQueue(this.scanner));
     }
 
     @Override
@@ -73,7 +68,7 @@ public class FileReaderProvider implements StreamsProvider {
 
     @Override
     public boolean isRunning() {
-        return this.scanner.hasNextLine();
+        return this.scanner != null && this.scanner.hasNextLine();
     }
 
     @Override
@@ -83,39 +78,18 @@ public class FileReaderProvider implements StreamsProvider {
 
     @Override
     public void cleanUp() {
-        this.scanner.close();
-    }
-
-    private class ResultSet extends StreamsResultSet {
-
-        public ResultSet() {
-            super(null);
+        if(this.scanner!= null) {
+            this.scanner.close();
+            this.scanner = null;
         }
+    }
 
-
-        @Override
-        public Iterator<StreamsDatum> iterator() {
-            return new FileProviderIterator();
-        }
-
-        private class FileProviderIterator implements Iterator<StreamsDatum> {
-
-
-
-            @Override
-            public boolean hasNext() {
-                return scanner.hasNextLine();
-            }
-
-            @Override
-            public StreamsDatum next() {
-                return converter.convert(scanner.nextLine());
-            }
-
-            @Override
-            public void remove() {
-
-            }
+    private Queue<StreamsDatum> constructQueue(Scanner scanner) {
+        Queue<StreamsDatum> data = Queues.newLinkedBlockingQueue();
+        while(scanner.hasNextLine()) {
+            data.add(converter.convert(scanner.nextLine()));
         }
+        cleanUp();
+        return data;
     }
 }


[12/12] git commit: Merge branch 'master' into instagram

Posted by mf...@apache.org.
Merge branch 'master' into instagram


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/815ce2ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/815ce2ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/815ce2ab

Branch: refs/heads/instagram
Commit: 815ce2abdab1be8e279b48a7e38090f5f6948fcb
Parents: a3f443f c1246d9
Author: mfranklin <mf...@apache.org>
Authored: Wed Jul 2 12:13:35 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Wed Jul 2 12:13:35 2014 -0400

----------------------------------------------------------------------
 .../ElasticsearchConfigurator.java              |   4 +
 .../ElasticsearchPersistUpdater.java            |  35 -
 .../ElasticsearchPersistWriter.java             | 705 +++++++------------
 .../ElasticsearchPersistWriterTask.java         |  56 --
 .../ElasticsearchWriterConfiguration.json       |   5 +
 .../provider/SysomosHeartbeatStream.java        |   2 +-
 .../twitter/processor/TwitterTypeConverter.java |   2 +-
 .../TwitterJsonActivitySerializer.java          |   3 +
 .../TwitterJsonUserActivitySerializer.java      |  72 ++
 .../serializer/util/TwitterActivityUtil.java    |  22 +
 .../local/tasks/StreamsProviderTask.java        |  51 +-
 .../local/builders/LocalStreamBuilderTest.java  |  52 +-
 .../local/test/processors/SlowProcessor.java    |  50 ++
 .../test/providers/EmptyResultSetProvider.java  |   2 +-
 .../test/providers/NumericMessageProvider.java  |  63 +-
 .../test/component/FileReaderProvider.java      |  52 +-
 16 files changed, 517 insertions(+), 659 deletions(-)
----------------------------------------------------------------------



[04/12] git commit: good catch Matt F.

Posted by mf...@apache.org.
good catch Matt F.


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2e829d8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2e829d8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2e829d8d

Branch: refs/heads/instagram
Commit: 2e829d8dcd3deb25ec463fe5f15bb7e8c7d32edf
Parents: e0e4ad4
Author: Matthew Hager <Ma...@gmail.com>
Authored: Fri Jun 27 11:17:24 2014 -0500
Committer: Matthew Hager <Ma...@gmail.com>
Committed: Fri Jun 27 11:17:24 2014 -0500

----------------------------------------------------------------------
 .../apache/streams/elasticsearch/ElasticsearchPersistWriter.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2e829d8d/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index c40cbfa..5956808 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -243,7 +243,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
             try {
                 Thread.yield();
                 Thread.sleep(1);
-                timeOutThresholdInMS++;
+                counter++;
             } catch(InterruptedException ie) {
                 // No Operation
             }


[09/12] git commit: Updated elastic search defaults with lower backoff

Posted by mf...@apache.org.
Updated elastic search defaults with lower backoff


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/31888f99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/31888f99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/31888f99

Branch: refs/heads/instagram
Commit: 31888f99c8c17c20b5c90d9d661d9f44bff6c7c6
Parents: d91c4a4
Author: mfranklin <mf...@apache.org>
Authored: Mon Jun 30 10:35:31 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jun 30 10:35:31 2014 -0400

----------------------------------------------------------------------
 .../streams/elasticsearch/ElasticsearchPersistWriter.java     | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/31888f99/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 5956808..72145e1 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -59,9 +59,12 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
     private static final NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
     private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
     private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
-    private static final long WAITING_DOCS_LIMIT = 10000;
-    private static final long DEFAULT_MAX_WAIT = 10000;
     private static final int DEFAULT_BATCH_SIZE = 100;
+    //ES defaults its bulk index queue to 50 items.  We want to be under this on our backoff so set this to 1/2 ES default
+    //at a batch size as configured here.
+    private static final long WAITING_DOCS_LIMIT = DEFAULT_BATCH_SIZE * 25;
+    //A document should have to wait no more than 10s to get flushed
+    private static final long DEFAULT_MAX_WAIT = 10000;
 
     private static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
 


[08/12] git commit: STREAMS-113 | Code review feedback

Posted by mf...@apache.org.
STREAMS-113 | Code review feedback


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/42debdf2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/42debdf2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/42debdf2

Branch: refs/heads/instagram
Commit: 42debdf2206fc99dc6a57f64e673a71a120adcf2
Parents: bc9fe49
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Mon Jun 30 09:29:18 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Mon Jun 30 09:29:18 2014 -0500

----------------------------------------------------------------------
 .../org/apache/streams/twitter/processor/TwitterTypeConverter.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/42debdf2/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
index 0737e39..74cce27 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
@@ -176,7 +176,7 @@ public class TwitterTypeConverter implements StreamsProcessor {
                 node = (ObjectNode)mapper.valueToTree(item);
 
                 // since data is coming from outside provider, we don't know what type the events are
-                Class inClass = TwitterEventClassifier.detectClass(item.toString());
+                Class inClass = TwitterEventClassifier.detectClass(mapper.writeValueAsString(item));
 
                 Object out = convert(node, inClass, outClass);
 


[03/12] git commit: changes from comments.

Posted by mf...@apache.org.
changes from comments.


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e0e4ad4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e0e4ad4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e0e4ad4a

Branch: refs/heads/instagram
Commit: e0e4ad4a7d1e12873b1aa1598fd7658f23462c2a
Parents: 1d36ab6
Author: Matthew Hager <Ma...@gmail.com>
Authored: Fri Jun 27 10:03:58 2014 -0500
Committer: Matthew Hager <Ma...@gmail.com>
Committed: Fri Jun 27 10:03:58 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchPersistWriter.java             | 61 +-------------------
 1 file changed, 2 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e0e4ad4a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 95f5f57..c40cbfa 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -157,12 +157,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
             return docAsJson;
         else {
             ObjectNode node = (ObjectNode)OBJECT_MAPPER.readTree(docAsJson);
-            try {
-                node.put("_metadata", OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsBytes(streamsDatum.getMetadata())));
-            }
-            catch(Throwable e) {
-                LOGGER.warn("Unable to write metadata");
-            }
+            node.put("_metadata", OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsBytes(streamsDatum.getMetadata())));
             return OBJECT_MAPPER.writeValueAsString(node);
         }
     }
@@ -180,8 +175,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
 
         } catch (Throwable e) {
             // this line of code should be logically unreachable.
-            LOGGER.warn("This is unexpected: {}", e.getMessage());
-            e.printStackTrace();
+            LOGGER.warn("This is unexpected: {}", e);
         }
     }
 
@@ -316,27 +310,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
         add(indexRequestBuilder.request());
     }
 
-    /**
-     *  This function is trashed... needs to be fixed.
-     *
-    private synchronized void add(UpdateRequest request) {
-        Preconditions.checkNotNull(request);
-        checkAndCreateBulkRequest();
-
-        checkIndexImplications(request.index());
-
-        bulkRequest.add(request);
-        try {
-            Optional<Integer> size = Objects.firstNonNull(
-                    Optional.fromNullable(request.doc().source().length()),
-                    Optional.fromNullable(request.script().length()));
-            trackItemAndBytesWritten(size.get().longValue());
-        } catch (NullPointerException x) {
-            trackItemAndBytesWritten(1000);
-        }
-    }
-    */
-
     protected void add(IndexRequest request) {
 
         Preconditions.checkNotNull(request);
@@ -424,36 +397,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
         }
     }
 
-    /**
-     *
-    private Set<String> checkIds(Set<String> input, String index, String type) {
-
-        IdsQueryBuilder idsFilterBuilder = new IdsQueryBuilder();
-
-        for (String s : input)
-            idsFilterBuilder.addIds(s);
-
-        SearchRequestBuilder searchRequestBuilder = this.manager.getClient()
-                .prepareSearch(index)
-                .setTypes(type)
-                .setQuery(idsFilterBuilder)
-                .addField("_id")
-                .setSize(input.size());
-
-        SearchHits hits = searchRequestBuilder.execute()
-                .actionGet()
-                .getHits();
-
-        Set<String> toReturn = new HashSet<String>();
-
-        for (SearchHit hit : hits) {
-            toReturn.add(hit.getId());
-        }
-
-        return toReturn;
-    }
-    */
-
     public void prepare(Object configurationObject) {
         this.veryLargeBulk = config.getBulk() == null ?
                 Boolean.FALSE :


[06/12] git commit: Merge PR#45 from Smashew 'smashew/Streams-ES-Writer-Fix'

Posted by mf...@apache.org.
Merge PR#45 from Smashew 'smashew/Streams-ES-Writer-Fix'


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7d65fb26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7d65fb26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7d65fb26

Branch: refs/heads/instagram
Commit: 7d65fb26da0e4493eb440b43722469c8da6abb46
Parents: da4d9ed 2e829d8
Author: mfranklin <mf...@apache.org>
Authored: Mon Jun 30 08:28:13 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jun 30 08:28:13 2014 -0400

----------------------------------------------------------------------
 .../ElasticsearchConfigurator.java              |   4 +
 .../ElasticsearchPersistWriter.java             | 698 +++++++------------
 2 files changed, 257 insertions(+), 445 deletions(-)
----------------------------------------------------------------------



[02/12] git commit: Fixed the ElasticSearchPersistWriter

Posted by mf...@apache.org.
Fixed the ElasticSearchPersistWriter


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/1d36ab61
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/1d36ab61
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/1d36ab61

Branch: refs/heads/instagram
Commit: 1d36ab61e1046ec8c197a9be23b3234fed4cf56c
Parents: 34c95a6
Author: Matthew Hager <Ma...@gmail.com>
Authored: Thu Jun 26 15:16:24 2014 -0500
Committer: Matthew Hager <Ma...@gmail.com>
Committed: Thu Jun 26 15:16:24 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchConfigurator.java              |   4 +
 .../ElasticsearchPersistWriter.java             | 693 ++++++++-----------
 2 files changed, 283 insertions(+), 414 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1d36ab61/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
index af4e360..4507fb5 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
@@ -77,6 +77,10 @@ public class ElasticsearchConfigurator {
         if( elasticsearch.hasPath("batchSize"))
             elasticsearchWriterConfiguration.setBatchSize(elasticsearch.getLong("batchSize"));
 
+        if( elasticsearch.hasPath("batchBytes"))
+            elasticsearchWriterConfiguration.setBatchBytes(elasticsearch.getLong("batchBytes"));
+
+
         elasticsearchWriterConfiguration.setIndex(index);
         elasticsearchWriterConfiguration.setType(type);
         elasticsearchWriterConfiguration.setMaxTimeBetweenFlushMs(maxMsBeforeFlush);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1d36ab61/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index da5916b..95f5f57 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -16,20 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.streams.elasticsearch;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.TreeNode;
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.typesafe.config.Config;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.*;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.ListenableActionFuture;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@@ -39,321 +37,227 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.Client;
+import org.elasticsearch.common.joda.time.DateTime;
 import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.index.query.IdsQueryBuilder;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
-import org.json.JSONException;
-import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.core.JsonParser;
 
-import java.io.Closeable;
-import java.io.Flushable;
 import java.io.IOException;
-import java.io.OutputStreamWriter;
+import java.io.Serializable;
 import java.text.DecimalFormat;
 import java.text.NumberFormat;
 import java.util.*;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable {
-    public static final String STREAMS_ID = "ElasticsearchPersistWriter";
+public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumStatusCountable, Serializable {
 
-    public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
+    public static final String STREAMS_ID = ElasticsearchPersistWriter.class.getCanonicalName();
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
     private static final NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
     private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
     private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
     private static final long WAITING_DOCS_LIMIT = 10000;
-    private static final int BYTES_IN_MB = 1024 * 1024;
-    private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
     private static final long DEFAULT_MAX_WAIT = 10000;
     private static final int DEFAULT_BATCH_SIZE = 100;
 
+    private static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
+
     private final List<String> affectedIndexes = new ArrayList<String>();
-    private final ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
-    //Primary lock for preventing multiple synchronous batches with the same data
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
-    //Create independent locks to synchronize updates that have nothing to do with actually sending data
-    private final Object countLock = new Object();
-    private final Object requestLock = new Object();
-
-    private ObjectMapper mapper = new StreamsJacksonMapper();
-    private ElasticsearchClientManager manager;
-    private ElasticsearchWriterConfiguration config;
-    private Client client;
-    private String parentID = null;
+
+    private final ElasticsearchClientManager manager;
+    private final ElasticsearchWriterConfiguration config;
+
     private BulkRequestBuilder bulkRequest;
-    private OutputStreamWriter currentWriter = null;
-    private int batchSize;
-    private long maxTimeBetweenFlushMs;
+
     private boolean veryLargeBulk = false;  // by default this setting is set to false
+    private long flushThresholdsRecords = DEFAULT_BATCH_SIZE;
+    private long flushThresholdBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
 
-    protected Thread task;
-
-    protected volatile Queue<StreamsDatum> persistQueue;
-    protected final List<ListenableActionFuture<BulkResponse>> responses = Lists.newLinkedList();
-
-    private volatile int currentItems = 0;
-    private volatile int totalSent = 0;
-    private volatile int totalSeconds = 0;
-    private volatile int totalAttempted = 0;
-    private volatile int totalOk = 0;
-    private volatile int totalFailed = 0;
-    private volatile int totalBatchCount = 0;
-    private volatile int totalRecordsWritten = 0;
-    private volatile long totalSizeInBytes = 0;
-    private volatile long batchSizeInBytes = 0;
-    private volatile int batchItemsSent = 0;
-    private volatile int totalByteCount = 0;
-    private volatile int byteCount = 0;
-    private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
+    private long flushThresholdTime = DEFAULT_MAX_WAIT;
+    private long lastFlush = new Date().getTime();
+    private Timer timer = new Timer();
 
-    public ElasticsearchPersistWriter() {
-        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
-        this.config = ElasticsearchConfigurator.detectWriterConfiguration(config);
-    }
 
-    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
-        this.config = config;
-    }
+    private final AtomicInteger batchesSent = new AtomicInteger(0);
+    private final AtomicInteger batchesResponded = new AtomicInteger(0);
 
-    public void setBatchSize(int batchSize) {
-        this.batchSize = batchSize;
-    }
+    private final AtomicLong currentBatchItems = new AtomicLong(0);
+    private final AtomicLong currentBatchBytes = new AtomicLong(0);
 
-    public void setVeryLargeBulk(boolean veryLargeBulk) {
-        this.veryLargeBulk = veryLargeBulk;
-    }
+    private final AtomicLong totalSent = new AtomicLong(0);
+    private final AtomicLong totalSeconds = new AtomicLong(0);
+    private final AtomicLong totalOk = new AtomicLong(0);
+    private final AtomicLong totalFailed = new AtomicLong(0);
+    private final AtomicLong totalSizeInBytes = new AtomicLong(0);
 
-    public int getTotalOutstanding() {
-        return this.totalSent - (this.totalFailed + this.totalOk);
-    }
-
-    public long getFlushThresholdSizeInBytes() {
-        return flushThresholdSizeInBytes;
+    public ElasticsearchPersistWriter() {
+        this(ElasticsearchConfigurator.detectWriterConfiguration(StreamsConfigurator.config.getConfig("elasticsearch")));
     }
 
-    public int getTotalSent() {
-        return totalSent;
+    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
+        this(config, new ElasticsearchClientManager(config));
     }
 
-    public int getTotalSeconds() {
-        return totalSeconds;
+    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config, ElasticsearchClientManager manager) {
+        this.config = config;
+        this.manager = manager;
+        this.bulkRequest = this.manager.getClient().prepareBulk();
     }
 
-    public int getTotalOk() {
-        return totalOk;
-    }
+    public long getBatchesSent()                            { return this.batchesSent.get(); }
+    public long getBatchesResponded()                       { return batchesResponded.get(); }
 
-    public int getTotalFailed() {
-        return totalFailed;
-    }
 
-    public int getTotalBatchCount() {
-        return totalBatchCount;
-    }
+    public long getFlushThresholdsRecords()                 { return this.flushThresholdsRecords; }
+    public long getFlushThresholdBytes()                    { return this.flushThresholdBytes; }
+    public long getFlushThreasholdMaxTime()                 { return this.flushThresholdTime; }
 
-    public long getTotalSizeInBytes() {
-        return totalSizeInBytes;
-    }
+    public void setFlushThresholdRecords(long val)          { this.flushThresholdsRecords = val; }
+    public void setFlushThresholdBytes(long val)            { this.flushThresholdBytes = val; }
+    public void setFlushThreasholdMaxTime(long val)         { this.flushThresholdTime = val; }
+    public void setVeryLargeBulk(boolean veryLargeBulk)     { this.veryLargeBulk = veryLargeBulk; }
 
-    public long getBatchSizeInBytes() {
-        return batchSizeInBytes;
-    }
+    private long getLastFlush()                             { return this.lastFlush; }
 
-    public int getBatchItemsSent() {
-        return batchItemsSent;
-    }
+    public long getTotalOutstanding()                       { return this.totalSent.get() - (this.totalFailed.get() + this.totalOk.get()); }
+    public long getTotalSent()                              { return this.totalSent.get(); }
+    public long getTotalOk()                                { return this.totalOk.get(); }
+    public long getTotalFailed()                            { return this.totalFailed.get(); }
+    public long getTotalSizeInBytes()                       { return this.totalSizeInBytes.get(); }
+    public long getTotalSeconds()                           { return this.totalSeconds.get(); }
+    public List<String> getAffectedIndexes()                { return this.affectedIndexes; }
 
-    public List<String> getAffectedIndexes() {
-        return this.affectedIndexes;
-    }
+    public boolean isConnected()                            { return (this.manager.getClient() != null); }
 
-    public void setFlushThresholdSizeInBytes(long sizeInBytes) {
-        this.flushThresholdSizeInBytes = sizeInBytes;
-    }
+    @Override
+    public void write(StreamsDatum streamsDatum) {
+        if(streamsDatum == null || streamsDatum.getDocument() == null)
+            return;
 
-    public long getMaxTimeBetweenFlushMs() {
-        return maxTimeBetweenFlushMs;
-    }
+        checkForBackOff();
 
-    public void setMaxTimeBetweenFlushMs(long maxTimeBetweenFlushMs) {
-        this.maxTimeBetweenFlushMs = maxTimeBetweenFlushMs;
+        try {
+            add(config.getIndex(), config.getType(), streamsDatum.getId(),
+                    streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis()) : Long.toString(streamsDatum.getTimestamp().getMillis()),
+                    convertAndAppendMetadata(streamsDatum));
+        } catch (Throwable e) {
+            LOGGER.warn("Unable to Write Datum to ElasticSearch: {}", e.getMessage());
+        }
     }
 
-    public boolean isConnected() {
-        return (client != null);
-    }
 
-    @Override
-    public void write(StreamsDatum streamsDatum) {
+    private String convertAndAppendMetadata(StreamsDatum streamsDatum) throws IOException {
+        Object object = streamsDatum.getDocument();
 
-        String json;
-        String id = null;
-        String ts = null;
-        try {
-            if( streamsDatum.getId() != null ) {
-                id = streamsDatum.getId();
-            }
-            if( streamsDatum.getTimestamp() != null ) {
-                ts = Long.toString(streamsDatum.getTimestamp().getMillis());
+        String docAsJson = (object instanceof String) ? object.toString() : OBJECT_MAPPER.writeValueAsString(object);
+        if(streamsDatum.getMetadata() == null || streamsDatum.getMetadata().size() == 0)
+            return docAsJson;
+        else {
+            ObjectNode node = (ObjectNode)OBJECT_MAPPER.readTree(docAsJson);
+            try {
+                node.put("_metadata", OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsBytes(streamsDatum.getMetadata())));
             }
-            if (streamsDatum.getDocument() instanceof String)
-                json = streamsDatum.getDocument().toString();
-            else {
-                json = mapper.writeValueAsString(streamsDatum.getDocument());
+            catch(Throwable e) {
+                LOGGER.warn("Unable to write metadata");
             }
-
-            add(config.getIndex(), config.getType(), id, ts, json);
-
-        } catch (Exception e) {
-            LOGGER.warn("{} {}", e.getMessage());
-            e.printStackTrace();
+            return OBJECT_MAPPER.writeValueAsString(node);
         }
     }
 
     public void cleanUp() {
-
         try {
-            flush();
-            backgroundFlushTask.shutdownNow();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        close();
-    }
 
-    @Override
-    public void close() {
-        try {
             // before they close, check to ensure that
-            this.flush();
-
-            this.lock.writeLock().lock();
-
-            int count = 0;
-            // We are going to give it 5 minutes.
-            while (this.getTotalOutstanding() > 0 && count++ < 20 * 60 * 5) {
-                for(ListenableActionFuture<BulkResponse> future : responses) {
-                    if(future.isDone() || future.isCancelled()) {
-                        BulkResponse response = future.get();
-                        LOGGER.warn("Found index request for {} items that was closed without notification", response.getItems().length);
-                        updateTotals(response, 0, 0);
-                    }
-                }
-                Thread.sleep(50);
-            }
-
-            if (this.getTotalOutstanding() > 0) {
-                LOGGER.error("We never cleared our buffer");
-            }
+            flushInternal();
 
+            waitToCatchUp(0, 5 * 60 * 1000);
+            refreshIndexes();
 
-            for (String indexName : this.getAffectedIndexes()) {
-                createIndexIfMissing(indexName);
+            LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding());
 
-                if (this.veryLargeBulk) {
-                    LOGGER.debug("Resetting our Refresh Interval: {}", indexName);
-                    // They are in 'very large bulk' mode and the process is finished. We now want to turn the
-                    // refreshing back on.
-                    UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
-                    updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", "5s"));
+        } catch (Throwable e) {
+            // this line of code should be logically unreachable.
+            LOGGER.warn("This is unexpected: {}", e.getMessage());
+            e.printStackTrace();
+        }
+    }
 
-                    // submit to ElasticSearch
-                    this.manager.getClient()
-                            .admin()
-                            .indices()
-                            .updateSettings(updateSettingsRequest)
-                            .actionGet();
-                }
+    private void refreshIndexes() {
+        for (String indexName : this.affectedIndexes) {
 
-                checkIndexImplications(indexName);
+            if (this.veryLargeBulk) {
+                LOGGER.debug("Resetting our Refresh Interval: {}", indexName);
+                // They are in 'very large bulk' mode and the process is finished. We now want to turn the
+                // refreshing back on.
+                UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
+                updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", "5s"));
 
-                LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
+                // submit to ElasticSearch
                 this.manager.getClient()
                         .admin()
                         .indices()
-                        .prepareRefresh(indexName)
-                        .execute()
+                        .updateSettings(updateSettingsRequest)
                         .actionGet();
             }
 
-            LOGGER.info("Closed: Wrote[{} of {}] Failed[{}]", this.getTotalOk(), this.getTotalSent(), this.getTotalFailed());
+            checkIndexImplications(indexName);
 
-        } catch (Exception e) {
-            // this line of code should be logically unreachable.
-            LOGGER.warn("This is unexpected: {}", e.getMessage());
-            e.printStackTrace();
-        } finally {
-            this.lock.writeLock().unlock();
+            LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
+            this.manager.getClient()
+                    .admin()
+                    .indices()
+                    .prepareRefresh(indexName)
+                    .execute()
+                    .actionGet();
         }
     }
 
     @Override
-    public void flush() throws IOException {
-        flushInternal();
-    }
-
-    @Override
     public DatumStatusCounter getDatumStatusCounter() {
         DatumStatusCounter counters = new DatumStatusCounter();
-        counters.incrementAttempt(this.batchItemsSent);
-        counters.incrementStatus(DatumStatus.SUCCESS, this.totalOk);
-        counters.incrementStatus(DatumStatus.FAIL, this.totalFailed);
+        counters.incrementStatus(DatumStatus.SUCCESS, (int)this.totalOk.get());
+        counters.incrementStatus(DatumStatus.FAIL, (int)this.totalFailed.get());
         return counters;
     }
 
-    public void start() {
-        backgroundFlushTask.scheduleWithFixedDelay(new Runnable() {
-            @Override
-            public void run() {
-                LOGGER.debug("Checking to see if data needs to be flushed");
-                long time = System.currentTimeMillis() - lastWrite.get();
-                if (time > maxTimeBetweenFlushMs && batchItemsSent > 0) {
-                    LOGGER.debug("Background Flush task determined {} are waiting to be flushed.  It has been {} since the last write to ES", batchItemsSent, time);
-                    flushInternal();
-                }
-            }
-        }, 0, maxTimeBetweenFlushMs * 2, TimeUnit.MILLISECONDS);
-        manager = new ElasticsearchClientManager(config);
-        client = manager.getClient();
-
-        LOGGER.info(client.toString());
-    }
-
-    public void flushInternal() {
-        lock.writeLock().lock();
+    private synchronized void flushInternal() {
         // we do not have a working bulk request, we can just exit here.
-        if (this.bulkRequest == null || batchItemsSent == 0)
+        if (this.bulkRequest == null || this.currentBatchItems.get() == 0)
             return;
 
+        // wait for one minute to catch up if it needs to
+        waitToCatchUp(5, 1 * 60 * 1000);
+
         // call the flush command.
-        flush(this.bulkRequest, batchItemsSent, batchSizeInBytes);
+        flush(this.bulkRequest, this.currentBatchItems.get(), this.currentBatchBytes.get());
 
-        // null the flush request, this will be created in the 'add' function below
-        this.bulkRequest = null;
+        // reset the current batch statistics
+        this.currentBatchItems.set(0);
+        this.currentBatchBytes.set(0);
 
-        // record the proper statistics, and add it to our totals.
-        this.totalSizeInBytes += this.batchSizeInBytes;
-        this.totalSent += batchItemsSent;
+        // reset our bulk request builder
+        this.bulkRequest = this.manager.getClient().prepareBulk();
+    }
 
-        // reset the current batch statistics
-        this.batchSizeInBytes = 0;
-        this.batchItemsSent = 0;
+    private synchronized void waitToCatchUp(int batchThreshold, int timeOutThresholdInMS) {
+        int counter = 0;
+        // If we still have 5 batches outstanding, we need to give it a minute to catch up
+        while(this.getBatchesSent() - this.getBatchesResponded() > batchThreshold && counter < timeOutThresholdInMS) {
+            try {
+                Thread.yield();
+                Thread.sleep(1);
+                timeOutThresholdInMS++;
+            } catch(InterruptedException ie) {
+                // No Operation
+            }
+        }
+    }
 
+    private void checkForBackOff() {
         try {
-            int count = 0;
             if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) {
                 /****************************************************************************
                  * Author:
@@ -379,6 +283,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
                  ****************************************************************************/
 
                 // wait for the flush to catch up. We are going to cap this at
+                int count = 0;
                 while (this.getTotalOutstanding() > WAITING_DOCS_LIMIT && count++ < 500)
                     Thread.sleep(10);
 
@@ -386,29 +291,20 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
                     LOGGER.warn("Even after back-off there are {} items still in queue.", this.getTotalOutstanding());
             }
         } catch (Exception e) {
-            LOGGER.info("We were broken from our loop: {}", e.getMessage());
-        } finally {
-            lock.writeLock().unlock();
+            LOGGER.warn("We were broken from our loop: {}", e.getMessage());
         }
-
-    }
-
-    public void add(String indexName, String type, String json) {
-        add(indexName, type, null, null, json);
-    }
-
-    public void add(String indexName, String type, String id, String json) {
-        add(indexName, type, id, null, json);
     }
 
-    public void add(String indexName, String type, String id, String ts, String json)
-    {
-        IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(manager.getClient());
+    public void add(String indexName, String type, String id, String ts, String json) {
 
-        indexRequestBuilder.setIndex(indexName);
-        indexRequestBuilder.setType(type);
+        // make sure that these are not null
+        Preconditions.checkNotNull(indexName);
+        Preconditions.checkNotNull(type);
+        Preconditions.checkNotNull(json);
 
-        indexRequestBuilder.setSource(json);
+        IndexRequestBuilder indexRequestBuilder = manager.getClient()
+                .prepareIndex(indexName, type)
+                .setSource(json);
 
         // / They didn't specify an ID, so we will create one for them.
         if(id != null)
@@ -417,114 +313,109 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
         if(ts != null)
             indexRequestBuilder.setTimestamp(ts);
 
-        // If there is a parentID that is associated with this bulk, then we are
-        // going to have to parse the raw JSON and attempt to dereference
-        // what the parent document should be
-        if (parentID != null) {
-            try {
-                // The JSONObject constructor can throw an exception, it is called
-                // out explicitly here so we can catch it.
-                indexRequestBuilder.setParent(new JSONObject(json).getString(parentID));
-            }
-            catch(JSONException e)
-            {
-                LOGGER.warn("Malformed JSON, cannot grab parentID: {}@{}[{}]: {}", id, indexName, type, e.getMessage());
-                totalFailed++;
-            }
-        }
         add(indexRequestBuilder.request());
     }
 
-    public void add(UpdateRequest updateRequest) {
-        Preconditions.checkNotNull(updateRequest);
-        lock.writeLock().lock();
+    /**
+     *  This function is trashed... needs to be fixed.
+     *
+    private synchronized void add(UpdateRequest request) {
+        Preconditions.checkNotNull(request);
         checkAndCreateBulkRequest();
-        checkIndexImplications(updateRequest.index());
-        bulkRequest.add(updateRequest);
+
+        checkIndexImplications(request.index());
+
+        bulkRequest.add(request);
         try {
             Optional<Integer> size = Objects.firstNonNull(
-                    Optional.fromNullable(updateRequest.doc().source().length()),
-                    Optional.fromNullable(updateRequest.script().length()));
+                    Optional.fromNullable(request.doc().source().length()),
+                    Optional.fromNullable(request.script().length()));
             trackItemAndBytesWritten(size.get().longValue());
         } catch (NullPointerException x) {
             trackItemAndBytesWritten(1000);
-        } finally {
-            lock.writeLock().unlock();
         }
     }
+    */
 
-    public void add(IndexRequest indexRequest) {
-        lock.writeLock().lock();
-        checkAndCreateBulkRequest();
-        checkIndexImplications(indexRequest.index());
-        bulkRequest.add(indexRequest);
-        try {
-            trackItemAndBytesWritten(indexRequest.source().length());
-        } catch (NullPointerException x) {
-            LOGGER.warn("NPE adding/sizing indexrequest");
-        } finally {
-            lock.writeLock().unlock();
+    protected void add(IndexRequest request) {
+
+        Preconditions.checkNotNull(request);
+        Preconditions.checkNotNull(request.index());
+
+        // If our queue is larger than our flush threshold, then we should flush the queue.
+        synchronized (this) {
+            checkIndexImplications(request.index());
+
+            bulkRequest.add(request);
+
+            this.currentBatchBytes.addAndGet(request.source().length());
+            this.currentBatchItems.incrementAndGet();
+
+            checkForFlush();
         }
     }
 
-    private void trackItemAndBytesWritten(long sizeInBytes)
-    {
-        currentItems++;
-        batchItemsSent++;
-        batchSizeInBytes += sizeInBytes;
-
-        // If our queue is larger than our flush threashold, then we should flush the queue.
-        if( (batchSizeInBytes > flushThresholdSizeInBytes) ||
-                (currentItems >= batchSize) ) {
-            flushInternal();
-            this.currentItems = 0;
+    private void checkForFlush() {
+        synchronized (this) {
+            if (this.currentBatchBytes.get() >= this.flushThresholdBytes ||
+                    this.currentBatchItems.get() >= this.flushThresholdsRecords ||
+                    new Date().getTime() - this.lastFlush >= this.flushThresholdTime) {
+                // We should flush
+                flushInternal();
+            }
         }
     }
 
-    private void checkIndexImplications(String indexName)
-    {
+    private void checkIndexImplications(String indexName) {
+        // We need this to be safe across all writers that are currently being executed
+        synchronized (ElasticsearchPersistWriter.class) {
 
-        // check to see if we have seen this index before.
-        if(this.affectedIndexes.contains(indexName))
-            return;
+            // this will be common if we have already verified the index.
+            if (this.affectedIndexes.contains(indexName))
+                return;
 
-        // we haven't log this index.
-        this.affectedIndexes.add(indexName);
 
-        // Check to see if we are in 'veryLargeBulk' mode
-        // if we aren't, exit early
-        if(!this.veryLargeBulk)
-            return;
+            // create the index if it is missing
+            createIndexIfMissing(indexName);
 
+            // we haven't log this index.
+            this.affectedIndexes.add(indexName);
 
-        // They are in 'very large bulk' mode we want to turn off refreshing the index.
+            // Check to see if we are in 'veryLargeBulk' mode
+            // if we aren't, exit early
+            if (this.veryLargeBulk) {
 
-        // Create a request then add the setting to tell it to stop refreshing the interval
-        UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
-        updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1));
+                // They are in 'very large bulk' mode we want to turn off refreshing the index.
+                // Create a request then add the setting to tell it to stop refreshing the interval
+                UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
+                updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1));
 
-        // submit to ElasticSearch
-        this.manager.getClient()
-                .admin()
-                .indices()
-                .updateSettings(updateSettingsRequest)
-                .actionGet();
+                // submit to ElasticSearch
+                this.manager.getClient()
+                        .admin()
+                        .indices()
+                        .updateSettings(updateSettingsRequest)
+                        .actionGet();
+            }
+        }
     }
 
     public void createIndexIfMissing(String indexName) {
+        // Synchronize this on a static class level
         if (!this.manager.getClient()
                 .admin()
                 .indices()
                 .exists(new IndicesExistsRequest(indexName))
                 .actionGet()
-                .isExists()) {
+                .isExists())
+        {
             // It does not exist... So we are going to need to create the index.
             // we are going to assume that the 'templates' that we have loaded into
             // elasticsearch are sufficient to ensure the index is being created properly.
             CreateIndexResponse response = this.manager.getClient().admin().indices().create(new CreateIndexRequest(indexName)).actionGet();
 
             if (response.isAcknowledged()) {
-                LOGGER.info("Index {} did not exist. The index was automatically created from the stored ElasticSearch Templates.", indexName);
+                LOGGER.info("Index Created: {}", indexName);
             } else {
                 LOGGER.error("Index {} did not exist. While attempting to create the index from stored ElasticSearch Templates we were unable to get an acknowledgement.", indexName);
                 LOGGER.error("Error Message: {}", response.toString());
@@ -533,22 +424,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
         }
     }
 
-    public void add(String indexName, String type, Map<String, Object> toImport) {
-        for (String id : toImport.keySet())
-            add(indexName, type, id, (String) toImport.get(id));
-    }
-
-    private void checkThenAddBatch(String index, String type, Map<String, String> workingBatch) {
-        Set<String> invalidIDs = checkIds(workingBatch.keySet(), index, type);
-
-        for (String toAddId : workingBatch.keySet())
-            if (!invalidIDs.contains(toAddId))
-                add(index, type, toAddId, workingBatch.get(toAddId));
-
-        LOGGER.info("Adding Batch: {} -> {}", workingBatch.size(), invalidIDs.size());
-    }
-
-
+    /**
+     *
     private Set<String> checkIds(Set<String> input, String index, String type) {
 
         IdsQueryBuilder idsFilterBuilder = new IdsQueryBuilder();
@@ -575,102 +452,90 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
 
         return toReturn;
     }
+    */
 
-    @Override
     public void prepare(Object configurationObject) {
-        mapper = StreamsJacksonMapper.getInstance();
-        veryLargeBulk = config.getBulk() == null ? Boolean.FALSE : config.getBulk();
-        batchSize = config.getBatchSize() == null ? DEFAULT_BATCH_SIZE : (int)(config.getBatchSize().longValue());
-        maxTimeBetweenFlushMs = config.getMaxTimeBetweenFlushMs() == null ? DEFAULT_MAX_WAIT : config.getMaxTimeBetweenFlushMs().longValue();
-        start();
-    }
+        this.veryLargeBulk = config.getBulk() == null ?
+                Boolean.FALSE :
+                config.getBulk();
 
-    /**
-     * This method is to ONLY be called by flushInternal otherwise the counts will be off.
-     * @param bulkRequest
-     * @param thisSent
-     * @param thisSizeInBytes
-     */
-    private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final Long thisSizeInBytes) {
-        LOGGER.debug("Attempting to write {} items to ES", thisSent);
-        final ListenableActionFuture<BulkResponse> responseFuture = bulkRequest.execute();
-        this.addResponseFuture(responseFuture);
-        responseFuture.addListener(new ActionListener<BulkResponse>() {
-            @Override
-            public void onResponse(BulkResponse bulkItemResponses) {
-                lastWrite.set(System.currentTimeMillis());
-                removeResponseFuture(responseFuture);
-
-                updateTotals(bulkItemResponses, thisSent, thisSizeInBytes);
-            }
+        this.flushThresholdsRecords = config.getBatchSize() == null ?
+                DEFAULT_BATCH_SIZE :
+                (int)(config.getBatchSize().longValue());
 
-            @Override
-            public void onFailure(Throwable e) {
-                LOGGER.error("Error bulk loading: {}", e.getMessage());
-                removeResponseFuture(responseFuture);
-                e.printStackTrace();
-            }
-        });
-    }
+        this.flushThresholdTime = config.getMaxTimeBetweenFlushMs() != null && config.getMaxTimeBetweenFlushMs() > 0 ?
+                config.getMaxTimeBetweenFlushMs() :
+                DEFAULT_MAX_WAIT;
 
-    private void updateTotals(BulkResponse bulkItemResponses, Integer thisSent, double thisSizeInBytes) {
-        if (bulkItemResponses.hasFailures())
-            LOGGER.warn("Bulk Uploading had totalFailed: " + bulkItemResponses.buildFailureMessage());
+        this.flushThresholdBytes = config.getBatchBytes() == null ?
+                DEFAULT_BULK_FLUSH_THRESHOLD :
+                config.getBatchBytes();
 
-        long thisFailed = 0;
-        long thisOk = 0;
-        long thisMillis = bulkItemResponses.getTookInMillis();
+        timer.scheduleAtFixedRate(new TimerTask() {
+            public void run() {
+                checkForFlush();
+            }
+        }, this.flushThresholdTime, this.flushThresholdTime);
 
-        // keep track of the number of totalFailed and items that we have totalOk.
-        for (BulkItemResponse resp : bulkItemResponses.getItems()) {
-            if (resp.isFailed())
-                thisFailed++;
-            else
-                thisOk++;
-        }
+    }
 
-        synchronized(countLock) {
-            totalAttempted += thisSent;
-            totalOk += thisOk;
-            totalFailed += thisFailed;
-            totalSeconds += (thisMillis / 1000);
-            lock.writeLock().unlock();
-        }
+    private void flush(final BulkRequestBuilder bulkRequest, final Long sent, final Long sizeInBytes) {
+        LOGGER.debug("Writing to ElasticSearch: Items[{}] Size[{} mb]", sent, MEGABYTE_FORMAT.format(sizeInBytes / (double) (1024 * 1024)));
 
-        if (thisSent != (thisOk + thisFailed))
-            LOGGER.error("We sent more items than this");
 
-        LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]",
-                MEGABYTE_FORMAT.format(thisSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(thisOk), NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis),
-                MEGABYTE_FORMAT.format((double) totalSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding()));
+        // record the last time we flushed the index
+        this.lastFlush = new Date().getTime();
 
-    }
+        // add the totals
+        this.totalSent.addAndGet(sent);
 
+        // add the total number of batches sent
+        this.batchesSent.incrementAndGet();
 
-    private void checkAndCreateBulkRequest() {
-        // Synchronize to ensure that we don't lose any records
-        lock.writeLock().lock();
         try {
-            if (bulkRequest == null)
-                bulkRequest = this.manager.getClient().prepareBulk();
-        } finally {
-            lock.writeLock().unlock();
-        }
-    }
+            bulkRequest.execute().addListener(new ActionListener<BulkResponse>() {
+                public void onResponse(BulkResponse bulkItemResponses) {
+                    batchesResponded.incrementAndGet();
+                    updateTotals(bulkItemResponses, sent, sizeInBytes);
+                }
 
-    //Locking on a separate object than the writer as these objects are intended to be handled separately
-    private void addResponseFuture(ListenableActionFuture<BulkResponse> future) {
-        synchronized (requestLock) {
-            this.responses.add(future);
+                public void onFailure(Throwable throwable) {
+                    batchesResponded.incrementAndGet();
+                    throwable.printStackTrace();
+                }
+            });
+        }
+        catch(Throwable e) {
+            LOGGER.error("There was an error sending the batch: {}", e.getMessage());
         }
     }
 
-    //Locking on a separate object than the writer as these objects are intended to be handled separately
-    private void removeResponseFuture(ListenableActionFuture<BulkResponse> future) {
-        synchronized(requestLock) {
-            this.responses.remove(future);
+    private void updateTotals(final BulkResponse bulkItemResponses, final Long sent, final Long sizeInBytes) {
+        long failed = 0;
+        long passed = 0;
+        long millis = bulkItemResponses.getTookInMillis();
+
+        // keep track of the number of totalFailed and items that we have totalOk.
+        for (BulkItemResponse resp : bulkItemResponses.getItems()) {
+            if (resp == null || resp.isFailed())
+                failed++;
+            else
+                passed++;
         }
-    }
 
-}
+        if (failed > 0)
+            LOGGER.warn("Bulk Uploading had {} failures of {}", failed, sent);
+
+        this.totalOk.addAndGet(passed);
+        this.totalFailed.addAndGet(failed);
+        this.totalSeconds.addAndGet(millis / 1000);
+        this.totalSizeInBytes.addAndGet(sizeInBytes);
 
+        if (sent != (passed + failed))
+            LOGGER.error("Count MisMatch: Sent[{}] Passed[{}] Failed[{}]", sent, passed, failed);
+
+        LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]",
+                MEGABYTE_FORMAT.format(sizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(passed), NUMBER_FORMAT.format(failed), NUMBER_FORMAT.format(millis),
+                MEGABYTE_FORMAT.format((double) totalSizeInBytes.get() / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding()));
+    }
+}