You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/10/11 22:37:51 UTC

[01/17] git commit: writer should work without metadata

Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-170 933060e42 -> 6cd97b8e8


writer should work without metadata


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/df00e4a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/df00e4a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/df00e4a0

Branch: refs/heads/STREAMS-170
Commit: df00e4a06376bacda755618595922cd3382d121a
Parents: 3ee5175
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Thu Sep 11 11:32:53 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Thu Sep 11 11:32:53 2014 -0500

----------------------------------------------------------------------
 .../elasticsearch/ElasticsearchPersistWriter.java    | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/df00e4a0/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 664dd24..bfb21f5 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -144,9 +144,18 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
 
         checkForBackOff();
 
-        String index = (String) streamsDatum.getMetadata().get("index");
-        String type = (String) streamsDatum.getMetadata().get("type");
-        String id = setId(streamsDatum);
+        Map<String, Object> metadata = streamsDatum.getMetadata();
+
+        String index = null;
+        String type = null;
+        String id = streamsDatum.getId();
+
+        if( metadata != null && metadata.containsKey("index"))
+            index = (String) streamsDatum.getMetadata().get("index");
+        if( metadata != null && metadata.containsKey("type"))
+            type = (String) streamsDatum.getMetadata().get("type");
+        if( id == null && metadata != null && metadata.containsKey("id"))
+            id = (String) streamsDatum.getMetadata().get("id");
 
         if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
             index = config.getIndex();


[14/17] git commit: Utility Processors to - populate datum from metadata - populate datum from metadata in document field

Posted by sb...@apache.org.
Utility Processors to
 - populate datum from metadata
 - populate datum from metadata in document field


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/aef8860f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/aef8860f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/aef8860f

Branch: refs/heads/STREAMS-170
Commit: aef8860f95002045322265e4834331b57643a50f
Parents: 8c45159
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Thu Sep 11 12:51:22 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Wed Sep 24 12:31:08 2014 -0500

----------------------------------------------------------------------
 .../DatumFromMetadataAsDocumentProcessor.java   | 128 +++++++++++++++++++
 .../processor/DatumFromMetadataProcessor.java   |  91 +++++++++++++
 2 files changed, 219 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aef8860f/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
new file mode 100644
index 0000000..cfad87e
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
@@ -0,0 +1,128 @@
+package org.apache.streams.elasticsearch.processor;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.elasticsearch.ElasticsearchConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.elasticsearch.action.get.GetRequestBuilder;
+import org.elasticsearch.action.get.GetResponse;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by sblackmon on 9/4/14.
+ */
+public class DatumFromMetadataAsDocumentProcessor implements StreamsProcessor, Serializable {
+
+    public final static String STREAMS_ID = "DatumFromMetadataProcessor";
+
+    private ElasticsearchClientManager elasticsearchClientManager;
+    private ElasticsearchReaderConfiguration config;
+
+    private ObjectMapper mapper;
+
+    public DatumFromMetadataAsDocumentProcessor() {
+        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
+        this.config = ElasticsearchConfigurator.detectReaderConfiguration(config);
+    }
+
+    public DatumFromMetadataAsDocumentProcessor(Config config) {
+        this.config = ElasticsearchConfigurator.detectReaderConfiguration(config);
+    }
+
+    public DatumFromMetadataAsDocumentProcessor(ElasticsearchReaderConfiguration config) {
+        this.config = config;
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+        List<StreamsDatum> result = Lists.newArrayList();
+
+        ObjectNode metadataObjectNode;
+        try {
+            metadataObjectNode = mapper.readValue((String) entry.getDocument(), ObjectNode.class);
+        } catch (IOException e) {
+            return result;
+        }
+
+        Map<String, Object> metadata = asMap(metadataObjectNode);
+
+        if(entry == null || entry.getMetadata() == null)
+            return result;
+
+        String index = (String) metadata.get("index");
+        String type = (String) metadata.get("type");
+        String id = (String) metadata.get("id");
+
+        if( index == null ) {
+            index = this.config.getIndexes().get(0);
+        }
+        if( type == null ) {
+            type = this.config.getTypes().get(0);
+        }
+        if( id == null ) {
+            id = entry.getId();
+        }
+
+        GetRequestBuilder getRequestBuilder = elasticsearchClientManager.getClient().prepareGet(index, type, id);
+        getRequestBuilder.setFields("*", "_timestamp");
+        getRequestBuilder.setFetchSource(true);
+        GetResponse getResponse = getRequestBuilder.get();
+
+        if( getResponse == null || getResponse.isExists() == false || getResponse.isSourceEmpty() == true )
+            return result;
+
+        entry.setDocument(getResponse.getSource());
+        if( getResponse.getField("_timestamp") != null) {
+            DateTime timestamp = new DateTime(((Long) getResponse.getField("_timestamp").getValue()).longValue());
+            entry.setTimestamp(timestamp);
+        }
+
+        result.add(entry);
+
+        return result;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        this.elasticsearchClientManager = new ElasticsearchClientManager(config);
+        mapper = StreamsJacksonMapper.getInstance();
+        mapper.registerModule(new JsonOrgModule());
+    }
+
+    @Override
+    public void cleanUp() {
+        this.elasticsearchClientManager.getClient().close();
+    }
+
+    public Map<String, Object> asMap(JsonNode node) {
+
+        Iterator<Map.Entry<String, JsonNode>> iterator = node.fields();
+        Map<String, Object> ret = Maps.newHashMap();
+
+        Map.Entry<String, JsonNode> entry;
+
+        while (iterator.hasNext()) {
+            entry = iterator.next();
+            if( entry.getValue().asText() != null )
+                ret.put(entry.getKey(), entry.getValue().asText());
+        }
+
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/aef8860f/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
new file mode 100644
index 0000000..170749d
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
@@ -0,0 +1,91 @@
+package org.apache.streams.elasticsearch.processor;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.elasticsearch.ElasticsearchConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
+import org.elasticsearch.action.get.GetRequestBuilder;
+import org.elasticsearch.action.get.GetResponse;
+import org.joda.time.DateTime;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Created by sblackmon on 9/4/14.
+ */
+public class DatumFromMetadataProcessor implements StreamsProcessor, Serializable {
+
+    public final static String STREAMS_ID = "DatumFromMetadataProcessor";
+
+    private ElasticsearchClientManager elasticsearchClientManager;
+    private ElasticsearchReaderConfiguration config;
+
+    public DatumFromMetadataProcessor() {
+        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
+        this.config = ElasticsearchConfigurator.detectReaderConfiguration(config);
+    }
+
+    public DatumFromMetadataProcessor(Config config) {
+        this.config = ElasticsearchConfigurator.detectReaderConfiguration(config);
+    }
+
+    public DatumFromMetadataProcessor(ElasticsearchReaderConfiguration config) {
+        this.config = config;
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+        List<StreamsDatum> result = Lists.newArrayList();
+
+        if(entry == null || entry.getMetadata() == null)
+            return result;
+
+        String index = (String) entry.getMetadata().get("index");
+        String type = (String) entry.getMetadata().get("type");
+        String id = (String) entry.getMetadata().get("id");
+
+        if( index == null ) {
+            index = this.config.getIndexes().get(0);
+        }
+        if( type == null ) {
+            type = this.config.getTypes().get(0);
+        }
+        if( id == null ) {
+            id = entry.getId();
+        }
+
+        GetRequestBuilder getRequestBuilder = elasticsearchClientManager.getClient().prepareGet(index, type, id);
+        getRequestBuilder.setFields("*", "_timestamp");
+        getRequestBuilder.setFetchSource(true);
+        GetResponse getResponse = getRequestBuilder.get();
+
+        if( getResponse == null || getResponse.isExists() == false || getResponse.isSourceEmpty() == true )
+            return result;
+
+        entry.setDocument(getResponse.getSource());
+        if( getResponse.getField("_timestamp") != null) {
+            DateTime timestamp = new DateTime(((Long) getResponse.getField("_timestamp").getValue()).longValue());
+            entry.setTimestamp(timestamp);
+        }
+
+        result.add(entry);
+
+        return result;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        this.elasticsearchClientManager = new ElasticsearchClientManager(config);
+
+    }
+
+    @Override
+    public void cleanUp() {
+        this.elasticsearchClientManager.getClient().close();
+    }
+}


[15/17] git commit: license / javadoc headers

Posted by sb...@apache.org.
license / javadoc headers


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/bfd1bc1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/bfd1bc1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/bfd1bc1d

Branch: refs/heads/STREAMS-170
Commit: bfd1bc1d1f7c58f001c6aeb7fa430e7a70f299ed
Parents: aef8860
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Tue Sep 16 11:22:36 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Wed Sep 24 12:31:08 2014 -0500

----------------------------------------------------------------------
 .../DatumFromMetadataAsDocumentProcessor.java   | 20 +++++++++++++++++++-
 .../processor/DatumFromMetadataProcessor.java   | 20 +++++++++++++++++++-
 2 files changed, 38 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bfd1bc1d/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
index cfad87e..5e58bb0 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.streams.elasticsearch.processor;
 
 import com.fasterxml.jackson.databind.JsonNode;
@@ -25,7 +43,7 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Created by sblackmon on 9/4/14.
+ * Uses index and type in metadata map stored in datum document to populate current document into datums
  */
 public class DatumFromMetadataAsDocumentProcessor implements StreamsProcessor, Serializable {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/bfd1bc1d/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
index 170749d..a6e0838 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.streams.elasticsearch.processor;
 
 import com.google.common.collect.Lists;
@@ -16,7 +34,7 @@ import java.io.Serializable;
 import java.util.List;
 
 /**
- * Created by sblackmon on 9/4/14.
+ * Uses index and type in metadata to populate current document into datums
  */
 public class DatumFromMetadataProcessor implements StreamsProcessor, Serializable {
 


[06/17] git commit: Merge branch 'STREAMS-164'

Posted by sb...@apache.org.
Merge branch 'STREAMS-164'


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b4d71241
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b4d71241
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b4d71241

Branch: refs/heads/STREAMS-170
Commit: b4d71241b36806ddc02ffaa90892dbf4e93f254f
Parents: 35a8fbf 5bf8755
Author: sblackmon <sb...@apache.org>
Authored: Wed Sep 17 16:44:46 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Wed Sep 17 16:44:46 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchConfigurator.java              | 41 ++++-------
 .../ElasticsearchPersistDeleter.java            | 34 +++++----
 .../ElasticsearchPersistUpdater.java            | 66 +++++------------
 .../ElasticsearchPersistWriter.java             | 76 +++++++++++++-------
 4 files changed, 98 insertions(+), 119 deletions(-)
----------------------------------------------------------------------



[11/17] git commit: STREAMS-180 | InstagramTypeConverter is now able to properly serialize UserDataItem objects into Activities.

Posted by sb...@apache.org.
STREAMS-180 | InstagramTypeConverter is now able to properly serialize UserDataItem objects into Activities.


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/119608d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/119608d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/119608d2

Branch: refs/heads/STREAMS-170
Commit: 119608d2f196b24cfba4db8aaab19022a7131cf5
Parents: 2397f4b
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Mon Sep 22 16:49:19 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Mon Sep 22 16:49:19 2014 -0500

----------------------------------------------------------------------
 .../processor/InstagramTypeConverter.java       |  5 +-
 .../serializer/util/InstagramActivityUtil.java  | 46 ++++++++++++++++-
 .../test/InstagramActivitySerDeTest.java        | 53 +++++++++++++++++++-
 .../src/test/resources/testUserInfoData.txt     |  2 +
 4 files changed, 102 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/119608d2/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
index 90ec3a8..f0101fd 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
@@ -78,9 +78,10 @@ public class InstagramTypeConverter implements StreamsProcessor {
 
 
             } else if(item instanceof UserInfoData) {
-                activity = this.userInfoSerializer.deserialize((UserInfoData) item );
+                activity = new Activity();
+                instagramActivityUtil.updateActivity((UserInfoData) item, activity);
             }
-            if(activity != null && activity.getId() != null) {
+            if(activity != null) {
                 result = new StreamsDatum(activity);
                 count++;
             }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/119608d2/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
index 499d0e7..c210be8 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
@@ -26,6 +26,8 @@ import org.apache.streams.exceptions.ActivitySerializerException;
 import org.apache.streams.pojo.json.*;
 import org.jinstagram.entity.comments.CommentData;
 import org.jinstagram.entity.common.*;
+import org.jinstagram.entity.users.basicinfo.Counts;
+import org.jinstagram.entity.users.basicinfo.UserInfoData;
 import org.jinstagram.entity.users.feed.MediaFeedData;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -72,11 +74,53 @@ public class InstagramActivityUtil {
     }
 
     /**
+     * Updates the given Activity object with the values from the item
+     * @param item the object to use as the source
+     * @param activity the target of the updates.  Will receive all values from the tweet.
+     * @throws ActivitySerializerException
+     */
+    public static void updateActivity(UserInfoData item, Activity activity) throws ActivitySerializerException {
+        activity.setActor(buildActor(item));
+        activity.setId(null);
+        activity.setProvider(getProvider());
+    }
+
+    public static Actor buildActor(UserInfoData item) {
+        Actor actor = new Actor();
+
+        try {
+            Image image = new Image();
+            image.setUrl(item.getProfile_picture());
+
+            Counts counts = item.getCounts();
+
+            Map<String, Object> extensions = new HashMap<String, Object>();
+
+            extensions.put("followers", counts.getFollwed_by());
+            extensions.put("follows", counts.getFollows());
+            extensions.put("screenName", item.getUsername());
+
+            actor.setId(formatId(String.valueOf(item.getId())));
+            actor.setImage(image);
+            actor.setDisplayName(item.getFullName());
+            actor.setSummary(item.getBio());
+            actor.setUrl(item.getWebsite());
+
+            actor.setAdditionalProperty("handle", item.getUsername());
+            actor.setAdditionalProperty("extensions", extensions);
+        } catch (Exception e) {
+            LOGGER.error("Exception trying to build actor object: {}", e.getMessage());
+        }
+
+        return actor;
+    }
+
+    /**
      * Builds the actor
      * @param item the item
      * @return a valid Actor
      */
-    public static  Actor buildActor(MediaFeedData item) {
+    public static Actor buildActor(MediaFeedData item) {
         Actor actor = new Actor();
 
         try {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/119608d2/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java
index 075da80..de5e466 100644
--- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java
@@ -24,6 +24,7 @@ import org.apache.streams.instagram.serializer.util.InstagramDeserializer;
 import org.apache.streams.instagram.serializer.InstagramJsonActivitySerializer;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
+import org.jinstagram.entity.users.basicinfo.UserInfoData;
 import org.jinstagram.entity.users.feed.MediaFeedData;
 import org.junit.Assert;
 import org.junit.Test;
@@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
 import java.io.BufferedReader;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.util.Map;
 
 import static org.apache.streams.instagram.serializer.util.InstagramActivityUtil.updateActivity;
 import static org.hamcrest.CoreMatchers.*;
@@ -50,7 +52,7 @@ public class InstagramActivitySerDeTest {
     private final static Logger LOGGER = LoggerFactory.getLogger(InstagramActivitySerDeTest.class);
 
     @Test
-    public void Tests() {
+    public void TestMediaFeedObjects() {
         InstagramDeserializer instagramDeserializer = new InstagramDeserializer("");
         InputStream is = InstagramActivitySerDeTest.class.getResourceAsStream("/testMediaFeedObjects.txt");
         InputStreamReader isr = new InputStreamReader(is);
@@ -85,4 +87,53 @@ public class InstagramActivitySerDeTest {
             Assert.fail();
         }
     }
+
+    @Test
+    public void TestUserInfoData() {
+        InstagramDeserializer instagramDeserializer = new InstagramDeserializer("");
+        InputStream is = InstagramActivitySerDeTest.class.getResourceAsStream("/testUserInfoData.txt");
+        InputStreamReader isr = new InputStreamReader(is);
+        BufferedReader br = new BufferedReader(isr);
+
+        try {
+            while (br.ready()) {
+                String line = br.readLine();
+                if(!StringUtils.isEmpty(line))
+                {
+                    LOGGER.info("raw: {}", line);
+
+                    UserInfoData userInfoData = instagramDeserializer.createObjectFromResponse(UserInfoData.class, line);
+
+                    Activity activity = new Activity();
+
+                    LOGGER.info("activity: {}", activity.toString());
+
+                    updateActivity(userInfoData, activity);
+                    assertThat(activity, is(not(nullValue())));
+
+                    assertThat(activity.getId(), is(nullValue()));
+                    assertThat(activity.getActor(), is(not(nullValue())));
+                    assertThat(activity.getActor().getImage(), is(not(nullValue())));
+                    assertThat(activity.getActor().getDisplayName(), is(not(nullValue())));
+                    assertThat(activity.getActor().getSummary(), is(not(nullValue())));
+
+                    Map<String, Object> extensions = (Map<String, Object>)activity.getActor().getAdditionalProperties().get("extensions");
+                    assertThat(extensions, is(not(nullValue())));
+                    assertThat(extensions.get("follows"), is(not(nullValue())));
+                    assertThat(extensions.get("followers"), is(not(nullValue())));
+                    assertThat(extensions.get("screenName"), is(not(nullValue())));
+
+                    assertThat(activity.getActor().getAdditionalProperties().get("handle"), is(not(nullValue())));
+                    assertThat(activity.getActor().getId(), is(not(nullValue())));
+                    assertThat(activity.getActor().getUrl(), is(not(nullValue())));
+                    assertThat(activity.getVerb(), is(not(nullValue())));
+                    assertThat(activity.getProvider(), is(not(nullValue())));
+                }
+            }
+        } catch( Exception e ) {
+            System.out.println(e);
+            e.printStackTrace();
+            Assert.fail();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/119608d2/streams-contrib/streams-provider-instagram/src/test/resources/testUserInfoData.txt
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/resources/testUserInfoData.txt b/streams-contrib/streams-provider-instagram/src/test/resources/testUserInfoData.txt
new file mode 100644
index 0000000..679de91
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/test/resources/testUserInfoData.txt
@@ -0,0 +1,2 @@
+{"bio":"This is the official Instagram account for Speaker John Boehner, a regular guy with a big job.","counts":{"follows":72,"follwed_by":4844,"media":129},"first_name":null,"id":249235106,"last_name":null,"profile_picture":"http://images.ak.instagram.com/profiles/profile_249235106_75sq_1388678585.jpg","username":"speakerboehner","full_name":"Speaker John Boehner","website":"http://speaker.gov"}
+{"bio":"","counts":{"follows":37,"follwed_by":3083,"media":70},"first_name":null,"id":44979810,"last_name":null,"profile_picture":"http://images.ak.instagram.com/profiles/profile_44979810_75sq_1335477964.jpg","username":"senjohnmccain","full_name":"John McCain","website":"http://mccain.senate.gov"}
\ No newline at end of file


[07/17] git commit: addresses failing test in streams-processor-urls

Posted by sb...@apache.org.
addresses failing test in streams-processor-urls


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/dc5e7de3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/dc5e7de3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/dc5e7de3

Branch: refs/heads/STREAMS-170
Commit: dc5e7de3c5c2e2fe905928785aaf9b059df34747
Parents: b4d7124
Author: sblackmon <sb...@apache.org>
Authored: Wed Sep 17 16:58:50 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Wed Sep 17 16:58:50 2014 -0500

----------------------------------------------------------------------
 .../src/main/java/org/apache/streams/urls/LinkResolver.java    | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc5e7de3/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolver.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolver.java b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolver.java
index 2f7646b..5497b92 100644
--- a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolver.java
+++ b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolver.java
@@ -372,6 +372,12 @@ public class LinkResolver implements Serializable {
         } catch (UnsupportedEncodingException uee) {
             System.err.println("Unable to Decode URL. Decoding skipped.");
             uee.printStackTrace();
+        } catch (NullPointerException npe) {
+            System.err.println("NPE Decoding URL. Decoding skipped.");
+            npe.printStackTrace();
+        } catch (Throwable e) {
+            System.err.println("Misc error Decoding URL. Decoding skipped.");
+            e.printStackTrace();
         }
 
         // Remove the protocol, http:// ftp:// or similar from the front


[17/17] git commit: DocumentToMetadataProcessor utility processor

Posted by sb...@apache.org.
DocumentToMetadataProcessor utility processor


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6cd97b8e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6cd97b8e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6cd97b8e

Branch: refs/heads/STREAMS-170
Commit: 6cd97b8e8a77c4a29c3f3079223a94a4ef41b877
Parents: 576b849
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Wed Oct 1 15:42:55 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Wed Oct 1 15:42:55 2014 -0500

----------------------------------------------------------------------
 .../DatumFromMetadataAsDocumentProcessor.java   |  17 +--
 .../processor/DocumentToMetadataProcessor.java  | 118 +++++++++++++++++++
 2 files changed, 119 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6cd97b8e/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
index 5e58bb0..9aea4c4 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
@@ -78,7 +78,7 @@ public class DatumFromMetadataAsDocumentProcessor implements StreamsProcessor, S
             return result;
         }
 
-        Map<String, Object> metadata = asMap(metadataObjectNode);
+        Map<String, Object> metadata = DocumentToMetadataProcessor.asMap(metadataObjectNode);
 
         if(entry == null || entry.getMetadata() == null)
             return result;
@@ -128,19 +128,4 @@ public class DatumFromMetadataAsDocumentProcessor implements StreamsProcessor, S
         this.elasticsearchClientManager.getClient().close();
     }
 
-    public Map<String, Object> asMap(JsonNode node) {
-
-        Iterator<Map.Entry<String, JsonNode>> iterator = node.fields();
-        Map<String, Object> ret = Maps.newHashMap();
-
-        Map.Entry<String, JsonNode> entry;
-
-        while (iterator.hasNext()) {
-            entry = iterator.next();
-            if( entry.getValue().asText() != null )
-                ret.put(entry.getKey(), entry.getValue().asText());
-        }
-
-        return ret;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6cd97b8e/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
new file mode 100644
index 0000000..804e1ac
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.elasticsearch.processor;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.elasticsearch.ElasticsearchConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Uses index and type in metadata to populate current document into datums
+ */
+public class DocumentToMetadataProcessor implements StreamsProcessor, Serializable {
+
+    public final static String STREAMS_ID = "DatumFromMetadataProcessor";
+
+    private ElasticsearchClientManager elasticsearchClientManager;
+    private ElasticsearchReaderConfiguration config;
+
+    private ObjectMapper mapper;
+
+    public DocumentToMetadataProcessor() {
+        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
+        this.config = ElasticsearchConfigurator.detectReaderConfiguration(config);
+    }
+
+    public DocumentToMetadataProcessor(Config config) {
+        this.config = ElasticsearchConfigurator.detectReaderConfiguration(config);
+    }
+
+    public DocumentToMetadataProcessor(ElasticsearchReaderConfiguration config) {
+        this.config = config;
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+        List<StreamsDatum> result = Lists.newArrayList();
+
+        ObjectNode metadataObjectNode;
+        try {
+            metadataObjectNode = mapper.readValue((String) entry.getDocument(), ObjectNode.class);
+        } catch (IOException e) {
+            return result;
+        }
+
+        Map<String, Object> metadata = asMap(metadataObjectNode);
+
+        if(entry == null || metadata == null)
+            return result;
+
+        entry.setMetadata(metadata);
+
+        result.add(entry);
+
+        return result;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        this.elasticsearchClientManager = new ElasticsearchClientManager(config);
+        mapper = StreamsJacksonMapper.getInstance();
+        mapper.registerModule(new JsonOrgModule());
+
+    }
+
+    @Override
+    public void cleanUp() {
+        this.elasticsearchClientManager.getClient().close();
+    }
+
+    public static Map<String, Object> asMap(JsonNode node) {
+
+        Iterator<Map.Entry<String, JsonNode>> iterator = node.fields();
+        Map<String, Object> ret = Maps.newHashMap();
+
+        Map.Entry<String, JsonNode> entry;
+
+        while (iterator.hasNext()) {
+            entry = iterator.next();
+            if( entry.getValue().asText() != null )
+                ret.put(entry.getKey(), entry.getValue().asText());
+        }
+
+        return ret;
+    }
+}


[13/17] git commit: STREAMS-180 | Fixing issue where NPE could be thrown if a post has no comments

Posted by sb...@apache.org.
STREAMS-180 | Fixing issue where NPE could be thrown if a post has no comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/8c45159b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/8c45159b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/8c45159b

Branch: refs/heads/STREAMS-170
Commit: 8c45159b0e034c533a6836bc5d23f83a02395af5
Parents: ef2e065
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Mon Sep 22 17:23:59 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Mon Sep 22 17:23:59 2014 -0500

----------------------------------------------------------------------
 .../instagram/serializer/util/InstagramActivityUtil.java      | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8c45159b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
index 5bbbb35..d41ea93 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
@@ -322,8 +322,11 @@ public class InstagramActivityUtil {
 
         Comments comments = item.getComments();
         String commentsConcat = "";
-        for(CommentData commentData : comments.getComments()) {
-            commentsConcat += " " + commentData.getText();
+
+        if(comments != null) {
+            for (CommentData commentData : comments.getComments()) {
+                commentsConcat += " " + commentData.getText();
+            }
         }
         if(item.getCaption() != null) {
             commentsConcat += " " + item.getCaption().getText();


[10/17] git commit: STREAMS-179 | Added javdoc comments.

Posted by sb...@apache.org.
STREAMS-179 | Added javdoc comments.


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2397f4b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2397f4b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2397f4b4

Branch: refs/heads/STREAMS-170
Commit: 2397f4b480448ebd3e1a3c4efa4d9dc5ef64750a
Parents: e091c6c
Author: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Authored: Fri Sep 19 15:24:22 2014 -0500
Committer: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Committed: Fri Sep 19 15:24:22 2014 -0500

----------------------------------------------------------------------
 .../ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java        | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2397f4b4/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
index f8d6343..55a26e1 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
@@ -7,6 +7,7 @@ import org.slf4j.LoggerFactory;
 import java.util.concurrent.*;
 
 /**
+ * A fixed ThreadPoolExecutor that will shutdown a stream upon a thread ending execution due to an unhandled throwable.
  * @see {@link java.util.concurrent.ThreadPoolExecutor}
  */
 public class ShutdownStreamOnUnhandleThrowableThreadPoolExecutor extends ThreadPoolExecutor {


[03/17] git commit: changed how index, type, id get set to allow for absent metadata

Posted by sb...@apache.org.
changed how index, type, id get set to allow for absent metadata


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/63e2d428
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/63e2d428
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/63e2d428

Branch: refs/heads/STREAMS-170
Commit: 63e2d428aebc0b6619e6b4167ac8ad8f33709c75
Parents: 64c608f
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Thu Sep 11 12:36:00 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Thu Sep 11 12:36:00 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchPersistDeleter.java            | 48 ++++++++++++--------
 .../ElasticsearchPersistUpdater.java            | 46 ++++++++++---------
 .../ElasticsearchPersistWriter.java             | 17 -------
 3 files changed, 55 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63e2d428/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
index fece72e..9bb585c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
@@ -18,16 +18,15 @@
 
 package org.apache.streams.elasticsearch;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
 import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.update.UpdateRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+
 public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter implements StreamsPersistWriter {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistDeleter.class);
@@ -43,25 +42,38 @@ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter impl
     @Override
     public void write(StreamsDatum streamsDatum) {
 
-        Preconditions.checkNotNull(streamsDatum);
-        Preconditions.checkNotNull(streamsDatum.getDocument());
-        Preconditions.checkNotNull(streamsDatum.getMetadata());
-        Preconditions.checkNotNull(streamsDatum.getMetadata().get("id"));
+        if(streamsDatum == null || streamsDatum.getDocument() == null)
+            return;
+
+        LOGGER.debug("Update Document: {}", streamsDatum.getDocument());
 
-        String index;
-        String type;
-        String id;
+        Map<String, Object> metadata = streamsDatum.getMetadata();
 
-        index = Optional.fromNullable(
-                (String) streamsDatum.getMetadata().get("index"))
-                .or(config.getIndex());
-        type = Optional.fromNullable(
-                (String) streamsDatum.getMetadata().get("type"))
-                .or(config.getType());
-        id = (String) streamsDatum.getMetadata().get("id");
+        LOGGER.debug("Update Metadata: {}", metadata);
 
-        delete(index, type, id);
+        String index = null;
+        String type = null;
+        String id = streamsDatum.getId();
 
+        if( metadata != null && metadata.containsKey("index"))
+            index = (String) streamsDatum.getMetadata().get("index");
+        if( metadata != null && metadata.containsKey("type"))
+            type = (String) streamsDatum.getMetadata().get("type");
+        if( id == null && metadata != null && metadata.containsKey("id"))
+            id = (String) streamsDatum.getMetadata().get("id");
+
+        if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+            index = config.getIndex();
+        }
+        if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+            type = config.getType();
+        }
+
+        try {
+            delete(index, type, id);
+        } catch (Throwable e) {
+            LOGGER.warn("Unable to Delete Datum from ElasticSearch: {}", e.getMessage());
+        }
     }
 
     public void delete(String index, String type, String id) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63e2d428/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index a60467d..b8584e5 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -25,7 +25,7 @@ import org.elasticsearch.action.update.UpdateRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-//import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
+import java.util.Map;
 
 public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter implements StreamsPersistWriter {
 
@@ -42,33 +42,37 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
     @Override
     public void write(StreamsDatum streamsDatum) {
 
-        Preconditions.checkNotNull(streamsDatum);
-        Preconditions.checkNotNull(streamsDatum.getDocument());
-        Preconditions.checkNotNull(streamsDatum.getMetadata());
-        Preconditions.checkNotNull(streamsDatum.getMetadata().get("id"));
-
-        LOGGER.debug("Update Metadata: {}", streamsDatum.getMetadata());
+        if(streamsDatum == null || streamsDatum.getDocument() == null)
+            return;
 
         LOGGER.debug("Update Document: {}", streamsDatum.getDocument());
 
-        String index;
-        String type;
-        String id;
-        String json;
-        try {
+        Map<String, Object> metadata = streamsDatum.getMetadata();
 
-            json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
+        LOGGER.debug("Update Metadata: {}", metadata);
 
+        String index = null;
+        String type = null;
+        String id = streamsDatum.getId();
+
+        if( metadata != null && metadata.containsKey("index"))
             index = (String) streamsDatum.getMetadata().get("index");
+        if( metadata != null && metadata.containsKey("type"))
             type = (String) streamsDatum.getMetadata().get("type");
-            id = setId(streamsDatum);
-
-            if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
-                index = config.getIndex();
-            }
-            if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
-                type = config.getType();
-            }
+        if( id == null && metadata != null && metadata.containsKey("id"))
+            id = (String) streamsDatum.getMetadata().get("id");
+
+        if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+            index = config.getIndex();
+        }
+        if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+            type = config.getType();
+        }
+
+        String json;
+        try {
+
+            json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
 
             LOGGER.debug("Attempt Update: ({},{},{}) {}", index, type, id, json);
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63e2d428/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index bfb21f5..169c941 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -19,12 +19,8 @@
 
 package org.apache.streams.elasticsearch;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.TreeNode;
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.*;
@@ -43,7 +39,6 @@ import org.elasticsearch.common.joda.time.DateTime;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.core.JsonParser;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -173,18 +168,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
         }
     }
 
-    private String setId(StreamsDatum streamsDatum) {
-        String id = Optional.fromNullable(
-                (String) streamsDatum.getMetadata().get("id"))
-                .orNull();
-
-        if(id == null)
-            id = Optional.fromNullable(streamsDatum.getId())
-                    .orNull();
-
-        return id;
-    }
-
     private String convertAndAppendMetadata(StreamsDatum streamsDatum) throws IOException {
         Object object = streamsDatum.getDocument();
 


[04/17] git commit: more consistent logging / exception handling

Posted by sb...@apache.org.
more consistent logging / exception handling


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9c98a450
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9c98a450
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9c98a450

Branch: refs/heads/STREAMS-170
Commit: 9c98a450115f7e136d002c2df311aeebc3f860fe
Parents: 63e2d42
Author: sblackmon <sb...@apache.org>
Authored: Fri Sep 12 16:01:39 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Fri Sep 12 16:01:39 2014 -0500

----------------------------------------------------------------------
 .../streams/elasticsearch/ElasticsearchPersistDeleter.java     | 6 +++---
 .../streams/elasticsearch/ElasticsearchPersistUpdater.java     | 6 ++----
 .../streams/elasticsearch/ElasticsearchPersistWriter.java      | 4 ++++
 3 files changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9c98a450/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
index 9bb585c..95004f5 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
@@ -45,11 +45,11 @@ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter impl
         if(streamsDatum == null || streamsDatum.getDocument() == null)
             return;
 
-        LOGGER.debug("Update Document: {}", streamsDatum.getDocument());
+        LOGGER.debug("Delete Document: {}", streamsDatum.getDocument());
 
         Map<String, Object> metadata = streamsDatum.getMetadata();
 
-        LOGGER.debug("Update Metadata: {}", metadata);
+        LOGGER.debug("Delete Metadata: {}", metadata);
 
         String index = null;
         String type = null;
@@ -72,7 +72,7 @@ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter impl
         try {
             delete(index, type, id);
         } catch (Throwable e) {
-            LOGGER.warn("Unable to Delete Datum from ElasticSearch: {}", e.getMessage());
+            LOGGER.warn("Unable to Delete Document from ElasticSearch: {}", e.getMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9c98a450/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index b8584e5..d0e8acd 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -78,10 +78,8 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
 
             update(index, type, id, json);
 
-        } catch (Exception e) {
-            LOGGER.warn("Exception: {} ", e.getMessage());
-        } catch (Error e) {
-            LOGGER.warn("Error: {} ", e.getMessage());
+        } catch (Throwable e) {
+            LOGGER.warn("Unable to Update Document in ElasticSearch: {}", e.getMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9c98a450/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 169c941..ca643b9 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -139,8 +139,12 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
 
         checkForBackOff();
 
+        LOGGER.debug("Write Document: {}", streamsDatum.getDocument());
+
         Map<String, Object> metadata = streamsDatum.getMetadata();
 
+        LOGGER.debug("Write Metadata: {}", metadata);
+
         String index = null;
         String type = null;
         String id = streamsDatum.getId();


[16/17] git commit: Merge branch 'STREAMS-170' of https://git-wip-us.apache.org/repos/asf/incubator-streams into STREAMS-170

Posted by sb...@apache.org.
Merge branch 'STREAMS-170' of https://git-wip-us.apache.org/repos/asf/incubator-streams into STREAMS-170


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/576b8497
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/576b8497
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/576b8497

Branch: refs/heads/STREAMS-170
Commit: 576b84970959bbdaa283b541d699214ecf86d2b5
Parents: bfd1bc1 933060e
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Wed Sep 24 12:31:23 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Wed Sep 24 12:31:23 2014 -0500

----------------------------------------------------------------------

----------------------------------------------------------------------



[02/17] git commit: updater should work without metadata general cleanup of configurator

Posted by sb...@apache.org.
updater should work without metadata
general cleanup of configurator


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/64c608f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/64c608f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/64c608f1

Branch: refs/heads/STREAMS-170
Commit: 64c608f10d91435b5b127726d6e670daaa0cb6be
Parents: df00e4a
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Thu Sep 11 12:07:33 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Thu Sep 11 12:07:33 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchConfigurator.java              | 41 ++++---------
 .../ElasticsearchPersistUpdater.java            | 62 ++++++++------------
 2 files changed, 36 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/64c608f1/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
index 1c66789..439b5de 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
@@ -24,10 +24,6 @@ import com.typesafe.config.ConfigRenderOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
 /**
  * Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration
  */
@@ -38,39 +34,28 @@ public class ElasticsearchConfigurator {
     private final static ObjectMapper mapper = new ObjectMapper();
 
     public static ElasticsearchConfiguration detectConfiguration(Config elasticsearch) {
-        List<String> hosts = elasticsearch.getStringList("hosts");
-        Long port = elasticsearch.getLong("port");
-        String clusterName = elasticsearch.getString("clusterName");
 
-        ElasticsearchConfiguration elasticsearchConfiguration = new ElasticsearchConfiguration();
+        ElasticsearchConfiguration elasticsearchConfiguration = null;
 
-        elasticsearchConfiguration.setHosts(hosts);
-        elasticsearchConfiguration.setPort(port);
-        elasticsearchConfiguration.setClusterName(clusterName);
+        try {
+            elasticsearchConfiguration = mapper.readValue(elasticsearch.root().render(ConfigRenderOptions.concise()), ElasticsearchConfiguration.class);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Could not parse elasticsearchconfiguration");
+        }
 
         return elasticsearchConfiguration;
     }
 
     public static ElasticsearchReaderConfiguration detectReaderConfiguration(Config elasticsearch) {
 
-        ElasticsearchConfiguration elasticsearchConfiguration = detectConfiguration(elasticsearch);
-        ElasticsearchReaderConfiguration elasticsearchReaderConfiguration = mapper.convertValue(elasticsearchConfiguration, ElasticsearchReaderConfiguration.class);
+        ElasticsearchReaderConfiguration elasticsearchReaderConfiguration = null;
 
-        List<String> indexes = elasticsearch.getStringList("indexes");
-        List<String> types = elasticsearch.getStringList("types");
-
-        elasticsearchReaderConfiguration.setIndexes(indexes);
-        elasticsearchReaderConfiguration.setTypes(types);
-
-        if( elasticsearch.hasPath("_search") ) {
-            LOGGER.info("_search supplied by config");
-            Config searchConfig = elasticsearch.getConfig("_search");
-            try {
-                elasticsearchReaderConfiguration.setSearch(mapper.readValue(searchConfig.root().render(ConfigRenderOptions.concise()), Map.class));
-            } catch (IOException e) {
-                e.printStackTrace();
-                LOGGER.warn("Could not parse _search supplied by config");
-            }
+        try {
+            elasticsearchReaderConfiguration = mapper.readValue(elasticsearch.root().render(ConfigRenderOptions.concise()), ElasticsearchReaderConfiguration.class);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Could not parse elasticsearchconfiguration");
         }
 
         return elasticsearchReaderConfiguration;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/64c608f1/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index dbf7d25..a60467d 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -18,42 +18,13 @@
 
 package org.apache.streams.elasticsearch;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.index.query.IdsQueryBuilder;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
-import java.io.Flushable;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.util.*;
-
 //import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
 
 public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter implements StreamsPersistWriter {
@@ -76,6 +47,10 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
         Preconditions.checkNotNull(streamsDatum.getMetadata());
         Preconditions.checkNotNull(streamsDatum.getMetadata().get("id"));
 
+        LOGGER.debug("Update Metadata: {}", streamsDatum.getMetadata());
+
+        LOGGER.debug("Update Document: {}", streamsDatum.getDocument());
+
         String index;
         String type;
         String id;
@@ -84,19 +59,25 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
 
             json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
 
-            index = Optional.fromNullable(
-                    (String) streamsDatum.getMetadata().get("index"))
-                    .or(config.getIndex());
-            type = Optional.fromNullable(
-                    (String) streamsDatum.getMetadata().get("type"))
-                    .or(config.getType());
-            id = (String) streamsDatum.getMetadata().get("id");
+            index = (String) streamsDatum.getMetadata().get("index");
+            type = (String) streamsDatum.getMetadata().get("type");
+            id = setId(streamsDatum);
 
-            update(index, type, id, json);
+            if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+                index = config.getIndex();
+            }
+            if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+                type = config.getType();
+            }
 
-        } catch (JsonProcessingException e) {
-            LOGGER.warn("{} {}", e.getLocation(), e.getMessage());
+            LOGGER.debug("Attempt Update: ({},{},{}) {}", index, type, id, json);
 
+            update(index, type, id, json);
+
+        } catch (Exception e) {
+            LOGGER.warn("Exception: {} ", e.getMessage());
+        } catch (Error e) {
+            LOGGER.warn("Error: {} ", e.getMessage());
         }
     }
 
@@ -113,6 +94,9 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
                 .id(id)
                 .doc(json);
 
+        // add fields
+        updateRequest.docAsUpsert(true);
+
         add(updateRequest);
 
     }


[09/17] git commit: STREAMS-179 | Added new thread pool executor to shutdown streams

Posted by sb...@apache.org.
STREAMS-179 | Added new thread pool executor to shutdown streams


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e091c6cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e091c6cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e091c6cc

Branch: refs/heads/STREAMS-170
Commit: e091c6ccc4f6ca849a638402ce8de5a7d73a8df0
Parents: 507e679
Author: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Authored: Fri Sep 19 15:22:32 2014 -0500
Committer: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Committed: Fri Sep 19 15:22:32 2014 -0500

----------------------------------------------------------------------
 .../local/builders/LocalStreamBuilder.java      |   3 +-
 ...amOnUnhandleThrowableThreadPoolExecutor.java |  45 ++++++++
 ...nhandledThrowableThreadPoolExecutorTest.java | 103 +++++++++++++++++++
 3 files changed, 150 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e091c6cc/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 25f9fe7..bec1ff9 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -20,6 +20,7 @@ package org.apache.streams.local.builders;
 
 import org.apache.log4j.spi.LoggerFactory;
 import org.apache.streams.core.*;
+import org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor;
 import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread;
 import org.apache.streams.local.tasks.StatusCounterMonitorThread;
 import org.apache.streams.local.tasks.StreamsProviderTask;
@@ -173,7 +174,7 @@ public class LocalStreamBuilder implements StreamBuilder {
     public void start() {
         attachShutdownHandler();
         boolean isRunning = true;
-        this.executor = Executors.newFixedThreadPool(this.totalTasks);
+        this.executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(this.totalTasks, this);
         this.monitor = Executors.newFixedThreadPool(this.monitorTasks+1);
         Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>();
         tasks = new HashMap<String, List<StreamsTask>>();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e091c6cc/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
new file mode 100644
index 0000000..f8d6343
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
@@ -0,0 +1,45 @@
+package org.apache.streams.local.executors;
+
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.*;
+
+/**
+ * @see {@link java.util.concurrent.ThreadPoolExecutor}
+ */
+public class ShutdownStreamOnUnhandleThrowableThreadPoolExecutor extends ThreadPoolExecutor {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.class);
+
+    private LocalStreamBuilder streamBuilder;
+    private volatile boolean isStoped;
+
+    /**
+     * Creates a fixed size thread pool where corePoolSize & maximumPoolSize equal numThreads with an unbounded queue.
+     * @param numThreads number of threads in pool
+     * @param streamBuilder streambuilder to call {@link org.apache.streams.core.StreamBuilder#stop()} on upon receiving an unhandled throwable
+     */
+    public ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(int numThreads, LocalStreamBuilder streamBuilder) {
+        super(numThreads, numThreads, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+        this.streamBuilder = streamBuilder;
+        this.isStoped = false;
+    }
+
+    @Override
+    protected void afterExecute(Runnable r, Throwable t) {
+        if(t != null) {
+            LOGGER.error("Runnable, {}, exited with an unhandled throwable! : {}", r.getClass(), t);
+            LOGGER.error("Attempting to shut down stream.");
+            synchronized (this) {
+                if (!this.isStoped) {
+                    this.isStoped = true;
+                    this.streamBuilder.stop();
+                }
+            }
+        } else {
+            LOGGER.trace("Runnable, {}, finished executing.", r.getClass());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e091c6cc/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
new file mode 100644
index 0000000..17e8dd9
--- /dev/null
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
@@ -0,0 +1,103 @@
+package org.apache.streams.local.executors;
+
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ *
+ */
+public class ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest {
+
+
+    @Test
+    public void testShutDownOnException() {
+        LocalStreamBuilder sb = mock(LocalStreamBuilder.class);
+        final AtomicBoolean isShutdown = new AtomicBoolean(false);
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                isShutdown.set(true);
+                return null;
+            }
+        }).when(sb).stop();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+                latch.countDown();
+                throw new RuntimeException("Testing Throwable Handling!");
+            }
+        };
+
+        ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb);
+        executor.execute(runnable);
+        try {
+            latch.await();
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        }
+        executor.shutdownNow();
+        try {
+            executor.awaitTermination(1, TimeUnit.SECONDS);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        }
+        assertTrue("Expected StreamBuilder shutdown to be called", isShutdown.get());
+    }
+
+
+    @Test
+    public void testNormalExecution() {
+        LocalStreamBuilder sb = mock(LocalStreamBuilder.class);
+        final AtomicBoolean isShutdown = new AtomicBoolean(false);
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                isShutdown.set(true);
+                return null;
+            }
+        }).when(sb).stop();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+                latch.countDown();
+            }
+        };
+
+        ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb);
+        executor.execute(runnable);
+        try {
+            latch.await();
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        }
+        executor.shutdownNow();
+        try {
+            executor.awaitTermination(1, TimeUnit.SECONDS);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        }
+        assertFalse("Expected StreamBuilder shutdown to be called", isShutdown.get());
+    }
+
+
+}


[08/17] git commit: Merge pull request #4 from apache/master

Posted by sb...@apache.org.
Merge pull request #4 from apache/master

Merge Apache Master

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/507e6796
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/507e6796
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/507e6796

Branch: refs/heads/STREAMS-170
Commit: 507e67962e97e5e6b4c1de0bc0789c25804cd6af
Parents: dca6475 dc5e7de
Author: Ryan Ebanks <ry...@raveldata.com>
Authored: Fri Sep 19 11:03:37 2014 -0500
Committer: Ryan Ebanks <ry...@raveldata.com>
Committed: Fri Sep 19 11:03:37 2014 -0500

----------------------------------------------------------------------
 README.txt                                      |    2 +-
 pom.xml                                         |    2 +-
 streams-contrib/pom.xml                         |    1 +
 .../streams/console/ConsolePersistWriter.java   |    5 +-
 .../ElasticsearchConfigurator.java              |   41 +-
 .../ElasticsearchPersistDeleter.java            |   34 +-
 .../ElasticsearchPersistUpdater.java            |   66 +-
 .../ElasticsearchPersistWriter.java             |   65 +-
 .../ElasticsearchWriterConfiguration.json       |    6 +-
 .../streams-processor-jackson/pom.xml           |   86 ++
 .../CleanAdditionalPropertiesProcessor.java     |   62 +
 .../regex/AbstractRegexExtensionExtractor.java  |   13 +-
 .../org/apache/streams/urls/LinkResolver.java   |    6 +
 .../streams-provider-datasift/pom.xml           |   21 +-
 .../streams/datasift/csdl/DatasiftCsdlUtil.java |    6 +-
 .../DatasiftTypeConverterProcessor.java         |  154 +++
 .../provider/DatasiftManagedSourceSetup.java    |   76 ++
 .../provider/DatasiftStreamConfigurator.java    |   15 +-
 .../DatasiftTypeConverterProcessor.java         |  167 ---
 .../serializer/DatasiftActivitySerializer.java  |    5 +-
 .../DatasiftDefaultActivitySerializer.java      |   13 +-
 .../DatasiftInstagramActivitySerializer.java    |  118 ++
 .../DatasiftTweetActivitySerializer.java        |    6 +
 .../main/jsonschema/com/datasift/Datasift.json  | 1103 ------------------
 .../com/datasift/DatasiftConfiguration.json     |   22 -
 .../com/datasift/DatasiftPushConfiguration.json |   17 -
 .../datasift/DatasiftStreamConfiguration.json   |   17 -
 .../com/datasift/DatasiftTwitterUser.json       |   68 --
 .../org/apache/streams/datasift/Datasift.json   |  458 ++++++++
 .../streams/datasift/DatasiftConfiguration.json |  133 +++
 .../datasift/DatasiftPushConfiguration.json     |   17 +
 .../datasift/DatasiftStreamConfiguration.json   |   17 +
 .../datasift/facebook/DatasiftFacebook.json     |  120 ++
 .../datasift/instagram/DatasiftInstagram.json   |  178 +++
 .../interaction/DatasiftInteraction.json        |   92 ++
 .../datasift/twitter/DatasiftTwitter.json       |  365 ++++++
 .../datasift/twitter/DatasiftTwitterMedia.json  |  126 ++
 .../datasift/twitter/DatasiftTwitterUser.json   |   68 ++
 .../DatasiftTypeConverterProcessorTest.java     |    5 +-
 .../DatasiftActivitySerializerTest.java         |   19 +-
 .../test/resources/instagram_datasift_json.txt  |   25 +
 .../metadata/facebook_post.mup                  |  694 +++++++++++
 .../metadata/facebook_post.png                  |  Bin 0 -> 1038039 bytes
 .../api/FacebookPostActivitySerializer.java     |   29 +-
 .../processor/FacebookTypeConverter.java        |   12 +-
 .../pagefeed/FacebookDataCollector.java         |  138 +++
 .../pagefeed/FacebookPageFeedDataCollector.java |  124 ++
 .../pagefeed/FacebookPageFeedProvider.java      |   28 +
 .../provider/pagefeed/FacebookProvider.java     |  139 +++
 .../FacebookStreamsPostSerializer.java          |   60 +
 .../streams/facebook/FacebookConfiguration.json |   33 +
 .../provider/pagefeed/TestFacebookProvider.java |   94 ++
 .../processor/InstagramTypeConverter.java       |   19 +-
 .../provider/InstagramAbstractProvider.java     |  196 ++++
 .../provider/InstagramDataCollector.java        |  145 +++
 .../provider/InstagramRecentMediaCollector.java |  171 ---
 .../provider/InstagramRecentMediaProvider.java  |  201 ----
 .../InstagramRecentMediaCollector.java          |  111 ++
 .../InstagramRecentMediaProvider.java           |   60 +
 .../userinfo/InstagramUserInfoCollector.java    |   92 ++
 .../userinfo/InstagramUserInfoProvider.java     |   39 +
 .../serializer/InstagramUserInfoSerializer.java |   83 ++
 .../InstagramRecentMediaCollectorTest.java      |  159 ---
 .../InstagramRecentMediaProviderTest.java       |  175 ---
 .../InstagramRecentMediaCollectorTest.java      |  156 +++
 .../InstagramRecentMediaProviderTest.java       |  174 +++
 .../InstagramUserInfoCollectorTest.java         |  120 ++
 .../apache/streams/data/util/MoreoverUtils.java |    2 +-
 .../SysomosBeatActivityConverter.java           |    7 +-
 .../FetchAndReplaceTwitterProcessor.java        |    4 +-
 .../twitter/provider/TwitterConfigurator.java   |   70 ++
 .../provider/TwitterStreamConfigurator.java     |   99 --
 .../twitter/provider/TwitterStreamProvider.java |    2 +-
 .../provider/TwitterTimelineProvider.java       |    8 +-
 .../TwitterUserInformationProvider.java         |    4 +-
 .../apache/streams/core/util/DatumUtils.java    |   49 +
 streams-pojo/metadata/Serializer.mup            |  371 ++++++
 streams-pojo/metadata/serializer.png            |  Bin 0 -> 604337 bytes
 .../org/apache/streams/data/util/JsonUtil.java  |    8 +-
 .../jackson/StreamsDateTimeSerializer.java      |    4 -
 .../streams/jackson/StreamsJacksonMapper.java   |   10 -
 .../jackson/StreamsPeriodDeserializer.java      |    1 -
 .../jackson/StreamsPeriodSerializer.java        |    2 -
 .../org/apache/streams/pojo/json/activity.json  |    3 +-
 .../data/data/util/DateTimeSerDeTest.java       |    2 -
 streams-runtimes/streams-runtime-local/pom.xml  |    6 +
 .../local/tasks/StreamsPersistWriterTask.java   |    2 +
 .../local/tasks/StreamsProcessorTask.java       |    6 +-
 .../local/tasks/StreamsProviderTask.java        |   19 +-
 .../local/builders/LocalStreamBuilderTest.java  |   37 +-
 .../local/builders/ToyLocalBuilderExample.java  |    6 +-
 .../streams/local/tasks/BasicTasksTest.java     |    6 +-
 .../local/tasks/StreamsProviderTaskTest.java    |  147 +++
 .../test/processors/DoNothingProcessor.java     |    2 +-
 .../PassthroughDatumCounterProcessor.java       |    2 +-
 .../test/providers/NumericMessageProvider.java  |    2 +-
 .../local/test/writer/DatumCounterWriter.java   |    2 +-
 .../local/test/writer/DoNothingWriter.java      |    2 +-
 .../local/test/writer/SystemOutWriter.java      |    2 +-
 .../apache/streams/util/SerializationUtil.java  |    4 +-
 100 files changed, 5544 insertions(+), 2419 deletions(-)
----------------------------------------------------------------------



[12/17] git commit: STREAMS-180 | Missing javadoc

Posted by sb...@apache.org.
STREAMS-180 | Missing javadoc


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ef2e065f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ef2e065f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ef2e065f

Branch: refs/heads/STREAMS-170
Commit: ef2e065f79e3b050f94ead4c37b3d04f0e6f0059
Parents: 119608d
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Mon Sep 22 16:50:01 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Mon Sep 22 16:50:01 2014 -0500

----------------------------------------------------------------------
 .../instagram/serializer/util/InstagramActivityUtil.java        | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ef2e065f/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
index c210be8..5bbbb35 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
@@ -85,6 +85,11 @@ public class InstagramActivityUtil {
         activity.setProvider(getProvider());
     }
 
+    /**
+     * Builds an Actor object given a UserInfoData object
+     * @param item
+     * @return Actor object
+     */
     public static Actor buildActor(UserInfoData item) {
         Actor actor = new Actor();
 


[05/17] git commit: refactored to move duplicated code to new protected methods in ElasticsearchPersistWriter

Posted by sb...@apache.org.
refactored to move duplicated code to new protected methods in ElasticsearchPersistWriter


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5bf87552
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5bf87552
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5bf87552

Branch: refs/heads/STREAMS-170
Commit: 5bf8755236ff3fe53eae7b0f131bfbf57647ba29
Parents: 9c98a45
Author: sblackmon <sb...@apache.org>
Authored: Tue Sep 16 13:59:20 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Tue Sep 16 13:59:20 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchPersistDeleter.java            | 20 +------
 .../ElasticsearchPersistUpdater.java            | 20 +------
 .../ElasticsearchPersistWriter.java             | 62 ++++++++++++++------
 3 files changed, 51 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bf87552/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
index 95004f5..319cece 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
@@ -51,23 +51,9 @@ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter impl
 
         LOGGER.debug("Delete Metadata: {}", metadata);
 
-        String index = null;
-        String type = null;
-        String id = streamsDatum.getId();
-
-        if( metadata != null && metadata.containsKey("index"))
-            index = (String) streamsDatum.getMetadata().get("index");
-        if( metadata != null && metadata.containsKey("type"))
-            type = (String) streamsDatum.getMetadata().get("type");
-        if( id == null && metadata != null && metadata.containsKey("id"))
-            id = (String) streamsDatum.getMetadata().get("id");
-
-        if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
-            index = config.getIndex();
-        }
-        if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
-            type = config.getType();
-        }
+        String index = getIndex(metadata, config);
+        String type = getType(metadata, config);
+        String id = getId(streamsDatum);
 
         try {
             delete(index, type, id);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bf87552/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index d0e8acd..872c65e 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -51,23 +51,9 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
 
         LOGGER.debug("Update Metadata: {}", metadata);
 
-        String index = null;
-        String type = null;
-        String id = streamsDatum.getId();
-
-        if( metadata != null && metadata.containsKey("index"))
-            index = (String) streamsDatum.getMetadata().get("index");
-        if( metadata != null && metadata.containsKey("type"))
-            type = (String) streamsDatum.getMetadata().get("type");
-        if( id == null && metadata != null && metadata.containsKey("id"))
-            id = (String) streamsDatum.getMetadata().get("id");
-
-        if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
-            index = config.getIndex();
-        }
-        if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
-            type = config.getType();
-        }
+        String index = getIndex(metadata, config);
+        String type = getType(metadata, config);
+        String id = getId(streamsDatum);
 
         String json;
         try {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bf87552/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index ca643b9..4ec3315 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -145,23 +145,9 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
 
         LOGGER.debug("Write Metadata: {}", metadata);
 
-        String index = null;
-        String type = null;
-        String id = streamsDatum.getId();
-
-        if( metadata != null && metadata.containsKey("index"))
-            index = (String) streamsDatum.getMetadata().get("index");
-        if( metadata != null && metadata.containsKey("type"))
-            type = (String) streamsDatum.getMetadata().get("type");
-        if( id == null && metadata != null && metadata.containsKey("id"))
-            id = (String) streamsDatum.getMetadata().get("id");
-
-        if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
-            index = config.getIndex();
-        }
-        if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
-            type = config.getType();
-        }
+        String index = getIndex(metadata, config);
+        String type = getType(metadata, config);
+        String id = getId(streamsDatum);
 
         try {
             add(index, type, id,
@@ -506,4 +492,46 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt
                 MEGABYTE_FORMAT.format(sizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(passed), NUMBER_FORMAT.format(failed), NUMBER_FORMAT.format(millis),
                 MEGABYTE_FORMAT.format((double) totalSizeInBytes.get() / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding()));
     }
+
+    protected String getIndex(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) {
+
+        String index = null;
+
+        if( metadata != null && metadata.containsKey("index"))
+            index = (String) metadata.get("index");
+
+        if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+            index = config.getIndex();
+        }
+
+        return index;
+    }
+
+    protected String getType(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) {
+
+        String type = null;
+
+        if( metadata != null && metadata.containsKey("type"))
+            type = (String) metadata.get("type");
+
+        if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+            type = config.getType();
+        }
+
+
+        return type;
+    }
+
+    protected String getId(StreamsDatum datum) {
+
+        String id = datum.getId();
+
+        Map<String, Object> metadata = datum.getMetadata();
+
+        if( id == null && metadata != null && metadata.containsKey("id"))
+            id = (String) datum.getMetadata().get("id");
+
+        return id;
+    }
+
 }