You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/07/16 04:01:53 UTC

[01/18] git commit: Merge pull request #4 from apache/master

Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-134 4e9a2bb57 -> 2f1bc4226


Merge pull request #4 from apache/master

Merge Apache

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2ee31575
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2ee31575
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2ee31575

Branch: refs/heads/STREAMS-134
Commit: 2ee31575eba481bf77c98a9edfe3d04eae333f16
Parents: 829f805 a33c215
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Mon Jun 23 14:03:00 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Mon Jun 23 14:03:00 2014 -0500

----------------------------------------------------------------------
 .../org/apache/streams/s3/S3PersistReader.java  |    9 +-
 .../streams/console/ConsolePersistReader.java   |   23 +
 .../streams/console/ConsolePersistWriter.java   |   18 +
 .../elasticsearch/ElasticsearchClient.java      |   20 +-
 .../ElasticsearchClientManager.java             |   18 +
 .../ElasticsearchConfigurator.java              |   18 +
 .../ElasticsearchPersistReader.java             |   26 +-
 .../ElasticsearchPersistUpdater.java            |   18 +
 .../ElasticsearchPersistWriter.java             |    1 +
 .../ElasticsearchPersistWriterTask.java         |   18 +
 .../elasticsearch/ElasticsearchQuery.java       |   20 +-
 .../elasticsearch/PercolateProcessor.java       |   20 +-
 .../apache/streams/hbase/HbaseConfigurator.java |   18 +
 .../streams/hbase/HbasePersistWriter.java       |   18 +
 .../streams/hbase/HbasePersistWriterTask.java   |   18 +
 .../apache/streams/hdfs/HdfsConfigurator.java   |   18 +
 .../streams/hdfs/WebHdfsPersistReader.java      |   27 +-
 .../streams/hdfs/WebHdfsPersistReaderTask.java  |   18 +
 .../streams/hdfs/WebHdfsPersistWriter.java      |   18 +
 .../streams/hdfs/WebHdfsPersistWriterTask.java  |   18 +
 .../apache/streams/kafka/KafkaConfigurator.java |   18 +
 .../streams/kafka/KafkaPersistReader.java       |   23 +
 .../streams/kafka/KafkaPersistReaderTask.java   |   18 +
 .../streams/kafka/KafkaPersistWriter.java       |   18 +
 .../streams/kafka/KafkaPersistWriterTask.java   |   18 +
 .../streams/kafka/StreamsPartitioner.java       |   18 +
 .../apache/streams/mongo/MongoConfigurator.java |   18 +
 .../streams/mongo/MongoPersistWriter.java       |   18 +
 .../apache/streams/json/JsonPathExtractor.java  |    5 +-
 .../org/apache/streams/urls/LinkResolver.java   |    3 +-
 .../urls/LinkResolverHelperFunctions.java       |    1 +
 .../streams/urls/LinkResolverProcessor.java     |    3 +-
 .../streams/urls/LinkHelperFunctionsTest.java   |   18 +
 .../streams/urls/TestLinkUnwinderProcessor.java |   18 +
 .../streams-provider-datasift/pom.xml           |   12 +-
 .../streams/datasift/csdl/DatasiftCsdlUtil.java |  114 +
 .../datasift/provider/DatasiftConverter.java    |    1 +
 .../provider/DatasiftStreamConfigurator.java    |    2 +-
 .../provider/DatasiftStreamProvider.java        |    7 +-
 .../DatasiftTypeConverterProcessor.java         |   75 +-
 .../serializer/DatasiftActivitySerializer.java  |  193 +-
 .../DatasiftDefaultActivitySerializer.java      |  211 +
 .../DatasiftTweetActivitySerializer.java        |  234 +
 .../main/jsonschema/com/datasift/Datasift.json  |   44 +-
 .../com/datasift/test/DatasiftSerDeTest.java    |   37 +-
 .../provider/DatasiftStreamProviderTest.java    |   18 +
 .../DatasiftTypeConverterProcessorTest.java     |   20 +-
 .../datasift/provider/ErrorHandlerTest.java     |   18 +
 .../datasift/provider/SubscriptionTest.java     |   18 +
 .../DatasiftActivitySerializerTest.java         |   74 +
 .../src/test/resources/amazon_datasift_json.txt |   10 +
 .../src/test/resources/blog_datasift_json.txt   |  719 +++
 .../src/test/resources/board_datasift_json.txt  | 4160 ++++++++++++++++++
 .../test/resources/facebook_datasift_json.txt   | 1843 ++++++++
 .../src/test/resources/part-r-00000.json        | 1724 ++++----
 .../resources/rand_sample_datasift_json.txt     | 1547 +++++++
 .../resources/random_sample_datasift_json.txt   | 1547 +++++++
 .../src/test/resources/reddit_datasift_json.txt |   33 +
 .../test/resources/twitter_datasift_json.txt    | 1000 +++++
 .../test/resources/wikipedia_datasift_json.txt  |  252 ++
 .../test/resources/youtube_datasift_json.txt    |    7 +
 ...FacebookPublicFeedXmlActivitySerializer.java |   18 +
 .../FacebookPostActivitySerializerTest.java     |    1 +
 .../facebook/test/FacebookPostSerDeTest.java    |   18 +
 .../facebook/test/FacebookEDCSerDeTest.java     |   18 +
 .../gnip/flickr/test/FlickrEDCSerDeTest.java    |   18 +
 .../com/gplus/api/GPlusActivitySerializer.java  |   18 +
 .../com/gplus/api/GPlusEDCAsActivityTest.java   |   18 +
 .../com/instagram/test/InstagramSerDeTest.java  |   18 +
 .../reddit/api/RedditActivitySerializer.java    |   18 +
 .../reddit/api/RedditEDCAsActivityJSONTest.java |   18 +
 .../java/com/gnip/test/YouTubeEDCSerDeTest.java |   20 +-
 .../com/gnip/test/YoutubeEDCAsActivityTest.java |    2 +-
 .../ActivityXMLActivitySerializer.java          |   18 +
 .../PowerTrackActivitySerializer.java           |   18 +
 .../test/PowerTrackDeserializationTest.java     |   18 +
 .../com/google/gmail/GMailConfigurator.java     |   18 +
 .../gmail/provider/GMailImapProviderTask.java   |   18 +
 .../GMailMessageActivitySerializer.java         |   18 +
 .../google/gmail/provider/GMailProvider.java    |   26 +-
 .../gmail/provider/GMailRssProviderTask.java    |   18 +
 .../gplus/provider/GPlusActivitySerializer.java |   18 +
 .../gplus/provider/GPlusConfigurator.java       |   18 +
 .../gplus/provider/GPlusEventProcessor.java     |   18 +
 .../provider/GPlusHistoryProviderTask.java      |   18 +
 .../google/gplus/provider/GPlusProvider.java    |   23 +
 .../gmail/test/GMailMessageSerDeTest.java       |   18 +
 .../data/MoreoverJsonActivitySerializer.java    |   18 +
 .../data/MoreoverXmlActivitySerializer.java     |   18 +
 .../streams/data/moreover/MoreoverClient.java   |   18 +
 .../data/moreover/MoreoverConfigurator.java     |   18 +
 .../streams/data/moreover/MoreoverProvider.java |   23 +
 .../data/moreover/MoreoverProviderTask.java     |   18 +
 .../streams/data/moreover/MoreoverResult.java   |   18 +
 .../data/moreover/MoreoverResultSetWrapper.java |   18 +
 .../apache/streams/data/util/MoreoverUtils.java |   18 +
 .../MoreoverJsonActivitySerializerTest.java     |    1 +
 .../data/MoreoverXmlActivitySerializerTest.java |   18 +
 .../rss/provider/RssEventClassifier.java        |   18 +
 .../streams/rss/provider/RssEventProcessor.java |   18 +
 .../rss/provider/RssStreamConfigurator.java     |   18 +
 .../streams/rss/provider/RssStreamProvider.java |   24 +
 .../rss/provider/RssStreamProviderTask.java     |   18 +
 .../serializer/SyndEntryActivitySerializer.java |   18 +
 .../streams/rss/test/Top100FeedsTest.java       |   18 +
 .../sysomos/config/SysomosConfigurator.java     |   39 +
 .../sysomos/proessor/SysomosTypeConverter.java  |   56 -
 .../sysomos/processor/SysomosTypeConverter.java |   56 +
 .../sysomos/provider/ContentRequestBuilder.java |    1 +
 .../provider/SysomosHeartbeatStream.java        |  130 +-
 .../sysomos/provider/SysomosProvider.java       |   50 +-
 .../streams/sysomos/util/SysomosUtils.java      |    2 +-
 .../com/sysomos/test/SysomosJsonSerDeTest.java  |   18 +
 .../com/sysomos/test/SysomosXmlSerDeTest.java   |   18 +
 .../processor/TwitterEventProcessor.java        |   18 +
 .../processor/TwitterProfileProcessor.java      |   31 +-
 .../twitter/processor/TwitterTypeConverter.java |   18 +
 .../twitter/provider/TwitterErrorHandler.java   |   20 +-
 .../provider/TwitterEventClassifier.java        |   18 +
 .../provider/TwitterStreamConfigurator.java     |   18 +
 .../twitter/provider/TwitterStreamProvider.java |   23 +
 .../provider/TwitterStreamProviderTask.java     |   18 +
 .../provider/TwitterTimelineProvider.java       |   32 +-
 .../provider/TwitterTimelineProviderTask.java   |   18 +
 .../TwitterUserInformationProvider.java         |   12 +-
 .../serializer/StreamsTwitterMapper.java        |   18 +
 .../TwitterJsonActivitySerializer.java          |   18 +
 .../TwitterJsonDeleteActivitySerializer.java    |   18 +
 .../TwitterJsonRetweetActivitySerializer.java   |   18 +
 .../TwitterJsonTweetActivitySerializer.java     |   18 +
 ...erJsonUserstreameventActivitySerializer.java |   18 +
 .../streams/twitter/test/SimpleTweetTest.java   |   18 +
 .../twitter/test/TweetActivitySerDeTest.java    |   18 +
 .../streams/twitter/test/TweetSerDeTest.java    |   18 +
 .../test/TwitterEventClassifierTest.java        |   18 +
 .../twitter/test/TwitterStreamProviderTest.java |   18 +
 .../org/apache/streams/core/DatumStatus.java    |   18 +
 .../streams/core/DatumStatusCountable.java      |   18 +
 .../apache/streams/core/DatumStatusCounter.java |   18 +
 .../org/apache/streams/core/StreamBuilder.java  |   18 +
 .../org/apache/streams/core/StreamHandler.java  |   18 +
 .../org/apache/streams/core/StreamState.java    |   18 +
 .../apache/streams/core/StreamsOperation.java   |   18 +
 .../apache/streams/core/StreamsProvider.java    |   38 +-
 .../apache/streams/data/util/RFC3339Utils.java  |    5 +
 .../ActivityDeserializerException.java          |   18 +
 .../exceptions/ActivitySerializerException.java |   18 +
 .../jackson/StreamsDateTimeDeserializer.java    |   20 +-
 .../jackson/StreamsDateTimeSerializer.java      |   18 +
 .../streams/jackson/StreamsJacksonMapper.java   |   22 +-
 .../streams/jackson/StreamsJacksonModule.java   |   18 +
 .../jackson/StreamsPeriodDeserializer.java      |   18 +
 .../jackson/StreamsPeriodSerializer.java        |   20 +-
 .../data/data/util/DateTimeSerDeTest.java       |   18 +
 .../local/builders/InvalidStreamException.java  |   18 +
 .../local/builders/LocalStreamBuilder.java      |   18 +
 .../streams/local/builders/StreamComponent.java |   18 +
 .../streams/local/tasks/BaseStreamsTask.java    |   18 +
 .../tasks/LocalStreamProcessMonitorThread.java  |   18 +
 .../tasks/StatusCounterMonitorRunnable.java     |   18 +
 .../local/tasks/StatusCounterMonitorThread.java |   18 +
 .../streams/local/tasks/StreamsMergeTask.java   |   18 +
 .../local/tasks/StreamsPersistWriterTask.java   |   18 +
 .../local/tasks/StreamsProcessorTask.java       |   18 +
 .../local/tasks/StreamsProviderTask.java        |   18 +
 .../apache/streams/local/tasks/StreamsTask.java |   18 +
 .../local/builders/LocalStreamBuilderTest.java  |   18 +
 .../local/builders/ToyLocalBuilderExample.java  |   18 +
 .../streams/local/tasks/BasicTasksTest.java     |   18 +
 .../test/processors/DoNothingProcessor.java     |   18 +
 .../PassthroughDatumCounterProcessor.java       |   18 +
 .../test/providers/EmptyResultSetProvider.java  |    5 +
 .../test/providers/NumericMessageProvider.java  |   23 +
 .../local/test/writer/DatumCounterWriter.java   |   18 +
 .../local/test/writer/DoNothingWriter.java      |   18 +
 .../local/test/writer/SystemOutWriter.java      |   18 +
 .../component/ExpectedDatumsPersistWriter.java  |   18 +
 .../test/component/FileReaderProvider.java      |   23 +
 .../test/component/StreamsDatumConverter.java   |   18 +
 .../component/StringToDocumentConverter.java    |   18 +
 .../tests/TestComponentsLocalStream.java        |   18 +
 .../tests/TestExpectedDatumsPersitWriter.java   |   18 +
 .../component/tests/TestFileReaderProvider.java |   18 +
 .../trident/StreamsPersistWriterState.java      |   18 +
 .../storm/trident/StreamsProcessorFunction.java |   18 +
 .../storm/trident/StreamsProviderSpout.java     |   20 +-
 .../org/apache/streams/util/ComponentUtils.java |   18 +
 .../java/org/apache/streams/util/DateUtil.java  |   18 +
 .../apache/streams/util/SerializationUtil.java  |   18 +
 189 files changed, 15837 insertions(+), 1172 deletions(-)
----------------------------------------------------------------------



[11/18] git commit: Removed commented out duplicate code

Posted by sb...@apache.org.
Removed commented out duplicate code


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/62aab459
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/62aab459
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/62aab459

Branch: refs/heads/STREAMS-134
Commit: 62aab4593027904adce63c1c7cc17d468d141e18
Parents: b7bf172
Author: rebanks <re...@w2odigital.com>
Authored: Tue Jul 8 14:34:46 2014 -0500
Committer: rebanks <re...@w2odigital.com>
Committed: Tue Jul 8 14:34:46 2014 -0500

----------------------------------------------------------------------
 .../DatasiftTweetActivitySerializer.java        | 45 --------------------
 1 file changed, 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/62aab459/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
index 269f12a..7b9ccb3 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
@@ -129,51 +129,6 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
         return actor;
     }
 
-    //Need to make retweet user and tweet user the same object.
-//    public Actor retweetBuildActor(Actor actor, org.apache.streams.datasift.twitter.User user) {
-//
-//        actor.setDisplayName(user.getName());
-//        actor.setId(formatId(Optional.fromNullable(
-//                user.getIdStr())
-//                .or(Optional.of(user.getId().toString()))
-//                .orNull()));
-//        actor.setSummary(user.getDescription());
-//        try {
-//            actor.setPublished(RFC3339Utils.parseToUTC(user.getCreatedAt()));
-//        } catch (Exception e) {
-//            LOGGER.warn("Exception trying to parse date : {}", e);
-//        }
-//
-//        if(user.getUrl() != null) {
-//            actor.setUrl(user.getUrl());
-//        }
-//
-//        Map<String, Object> extensions = new HashMap<String,Object>();
-//        extensions.put("location", user.getLocation());
-//        extensions.put("posts", user.getStatusesCount());
-//        extensions.put("followers", user.getFollowersCount());
-//        extensions.put("screenName", user.getScreenName());
-//        if(user.getAdditionalProperties() != null) {
-//            extensions.put("favorites", user.getAdditionalProperties().get("favourites_count"));
-//        }
-//
-//        Image profileImage = new Image();
-//        String profileUrl = null;
-//        if(actor.getImage() == null && user.getAdditionalProperties() != null) {
-//            Object url = user.getAdditionalProperties().get("profile_image_url_https");
-//            if(url instanceof String)
-//                profileUrl = (String) url;
-//        }
-//        if(actor.getImage() == null && profileUrl == null) {
-//            profileUrl = user.getProfileImageUrl();
-//        }
-//        profileImage.setUrl(profileUrl);
-//        actor.setImage(profileImage);
-//
-//        actor.setAdditionalProperty("extensions", extensions);
-//        return actor;
-//    }
-
     public void addLocationExtension(Activity activity, Twitter twitter) {
         Map<String, Object> extensions = ensureExtensions(activity);
         Map<String, Object> location = Maps.newHashMap();


[07/18] git commit: STREAMS-105 | Code review feedback

Posted by sb...@apache.org.
STREAMS-105 | Code review feedback


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/26f3614f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/26f3614f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/26f3614f

Branch: refs/heads/STREAMS-134
Commit: 26f3614fd7d72be4526fc9589e75dfb32dda6588
Parents: 949b676
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Wed Jul 2 10:11:16 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Wed Jul 2 10:11:16 2014 -0500

----------------------------------------------------------------------
 .../datasift/serializer/DatasiftDefaultActivitySerializer.java     | 2 +-
 .../datasift/serializer/DatasiftTweetActivitySerializer.java       | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/26f3614f/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
index 09126dc..4095df6 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
@@ -174,7 +174,7 @@ public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Dat
         ActivityObject actObj = new ActivityObject();
         actObj.setObjectType(interaction.getContenttype());
         actObj.setUrl(interaction.getLink());
-        actObj.setId(interaction.getId());
+        actObj.setId(formatId("post",interaction.getId()));
         actObj.setContent(interaction.getContent());
 
         return actObj;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/26f3614f/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
index 1fecc4b..0482235 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
@@ -165,7 +165,7 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
         double[] coordiantes = new double[] { twitter.getGeo().getLongitude(), twitter.getGeo().getLatitude() };
         Map<String, Object> coords = Maps.newHashMap();
         coords.put("coordinates", coordiantes);
-        coords.put("type", "geo_point");
+        coords.put("type", "point");
         location.put("coordinates", coords);
         extensions.put("location", location);
     }


[02/18] git commit: Merge pull request #5 from apache/master

Posted by sb...@apache.org.
Merge pull request #5 from apache/master

Merge From Apache

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d71b159d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d71b159d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d71b159d

Branch: refs/heads/STREAMS-134
Commit: d71b159d1996a939cccede70b509c1767478aa72
Parents: 2ee3157 34c95a6
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Fri Jun 27 16:43:57 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Fri Jun 27 16:43:57 2014 -0500

----------------------------------------------------------------------
 pom.xml                                                         | 2 +-
 streams-contrib/streams-provider-google/google-gmail/pom.xml    | 2 +-
 .../apache/streams/sysomos/provider/SysomosHeartbeatStream.java | 4 +++-
 streams-contrib/streams-provider-twitter/pom.xml                | 5 +++--
 4 files changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------



[13/18] git commit: Merge branch 'master' into STREAMS-105

Posted by sb...@apache.org.
Merge branch 'master' into STREAMS-105

Conflicts:
	streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/fbb8ccff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/fbb8ccff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/fbb8ccff

Branch: refs/heads/STREAMS-134
Commit: fbb8ccffc0db8de0f2d591d1e6a3251b6ff96053
Parents: abaf5b0 5c9a531
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Thu Jul 10 10:24:27 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Thu Jul 10 10:24:27 2014 -0500

----------------------------------------------------------------------
 .../streams-persist-elasticsearch/README.md     |  38 +
 .../ElasticsearchConfigurator.java              |  18 +
 .../ElasticsearchPersistReader.java             |   3 +-
 .../ElasticsearchPersistUpdater.java            |  35 -
 .../ElasticsearchPersistWriter.java             | 705 +++++++------------
 .../ElasticsearchPersistWriterTask.java         |  56 --
 .../elasticsearch/ElasticsearchQuery.java       |  34 +-
 .../ElasticsearchReaderConfiguration.json       |   5 +
 .../ElasticsearchWriterConfiguration.json       |   5 +
 .../src/main/resources/reference.json           |   9 +
 streams-contrib/streams-persist-mongo/README.md |  16 +
 .../streams/mongo/MongoPersistReader.java       | 270 +++++++
 .../src/main/resources/reference.json           |   8 +
 .../src/main/resources/reference.properties     |  10 -
 .../DatasiftTypeConverterProcessor.java         |  12 +-
 .../DatasiftTweetActivitySerializer.java        | 180 ++---
 .../provider/SysomosHeartbeatStream.java        |  12 +-
 .../sysomos/provider/SysomosProvider.java       |   4 +-
 .../twitter/processor/TwitterTypeConverter.java |   2 +-
 .../TwitterJsonActivitySerializer.java          |   3 +
 .../TwitterJsonUserActivitySerializer.java      |  72 ++
 .../serializer/util/TwitterActivityUtil.java    |  22 +
 .../streams/local/builders/StreamComponent.java |   2 +-
 .../local/tasks/StreamsProviderTask.java        |  54 +-
 .../local/builders/LocalStreamBuilderTest.java  |  52 +-
 .../local/test/processors/SlowProcessor.java    |  50 ++
 .../test/providers/EmptyResultSetProvider.java  |   2 +-
 .../test/providers/NumericMessageProvider.java  |  63 +-
 .../test/component/FileReaderProvider.java      |  52 +-
 29 files changed, 1024 insertions(+), 770 deletions(-)
----------------------------------------------------------------------



[10/18] git commit: Created DatasiftTwitterUser object

Posted by sb...@apache.org.
Created DatasiftTwitterUser object


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b7bf172a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b7bf172a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b7bf172a

Branch: refs/heads/STREAMS-134
Commit: b7bf172a623fa484e8068abe46ca0d39ed693593
Parents: c50ce91
Author: rebanks <re...@w2odigital.com>
Authored: Tue Jul 8 14:33:25 2014 -0500
Committer: rebanks <re...@w2odigital.com>
Committed: Tue Jul 8 14:33:25 2014 -0500

----------------------------------------------------------------------
 .../DatasiftTweetActivitySerializer.java        |  92 +++++-----
 .../main/jsonschema/com/datasift/Datasift.json  | 179 +------------------
 .../com/datasift/DatasiftTwitterUser.json       |  61 +++++++
 3 files changed, 113 insertions(+), 219 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7bf172a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
index 3406eb4..269f12a 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
@@ -25,7 +25,7 @@ import com.google.common.collect.Maps;
 import org.apache.streams.data.util.RFC3339Utils;
 import org.apache.streams.datasift.Datasift;
 import org.apache.streams.datasift.interaction.Interaction;
-import org.apache.streams.datasift.interaction.User;
+import org.apache.streams.datasift.twitter.DatasiftTwitterUser;
 import org.apache.streams.datasift.twitter.Retweet;
 import org.apache.streams.datasift.twitter.Twitter;
 import org.apache.streams.pojo.json.Activity;
@@ -80,10 +80,10 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
     }
 
     public Actor buildActor(Datasift event, Twitter twitter) {
-        User user = twitter.getUser();
+        DatasiftTwitterUser user = twitter.getUser();
         Actor actor = super.buildActor(event.getInteraction());
         if(user == null) {
-            return retweetBuildActor(actor, twitter.getRetweet().getUser());
+            user = twitter.getRetweet().getUser();
         }
 
         actor.setDisplayName(user.getName());
@@ -130,49 +130,49 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
     }
 
     //Need to make retweet user and tweet user the same object.
-    public Actor retweetBuildActor(Actor actor, org.apache.streams.datasift.twitter.User user) {
-
-        actor.setDisplayName(user.getName());
-        actor.setId(formatId(Optional.fromNullable(
-                user.getIdStr())
-                .or(Optional.of(user.getId().toString()))
-                .orNull()));
-        actor.setSummary(user.getDescription());
-        try {
-            actor.setPublished(RFC3339Utils.parseToUTC(user.getCreatedAt()));
-        } catch (Exception e) {
-            LOGGER.warn("Exception trying to parse date : {}", e);
-        }
-
-        if(user.getUrl() != null) {
-            actor.setUrl(user.getUrl());
-        }
-
-        Map<String, Object> extensions = new HashMap<String,Object>();
-        extensions.put("location", user.getLocation());
-        extensions.put("posts", user.getStatusesCount());
-        extensions.put("followers", user.getFollowersCount());
-        extensions.put("screenName", user.getScreenName());
-        if(user.getAdditionalProperties() != null) {
-            extensions.put("favorites", user.getAdditionalProperties().get("favourites_count"));
-        }
-
-        Image profileImage = new Image();
-        String profileUrl = null;
-        if(actor.getImage() == null && user.getAdditionalProperties() != null) {
-            Object url = user.getAdditionalProperties().get("profile_image_url_https");
-            if(url instanceof String)
-                profileUrl = (String) url;
-        }
-        if(actor.getImage() == null && profileUrl == null) {
-            profileUrl = user.getProfileImageUrl();
-        }
-        profileImage.setUrl(profileUrl);
-        actor.setImage(profileImage);
-
-        actor.setAdditionalProperty("extensions", extensions);
-        return actor;
-    }
+//    public Actor retweetBuildActor(Actor actor, org.apache.streams.datasift.twitter.User user) {
+//
+//        actor.setDisplayName(user.getName());
+//        actor.setId(formatId(Optional.fromNullable(
+//                user.getIdStr())
+//                .or(Optional.of(user.getId().toString()))
+//                .orNull()));
+//        actor.setSummary(user.getDescription());
+//        try {
+//            actor.setPublished(RFC3339Utils.parseToUTC(user.getCreatedAt()));
+//        } catch (Exception e) {
+//            LOGGER.warn("Exception trying to parse date : {}", e);
+//        }
+//
+//        if(user.getUrl() != null) {
+//            actor.setUrl(user.getUrl());
+//        }
+//
+//        Map<String, Object> extensions = new HashMap<String,Object>();
+//        extensions.put("location", user.getLocation());
+//        extensions.put("posts", user.getStatusesCount());
+//        extensions.put("followers", user.getFollowersCount());
+//        extensions.put("screenName", user.getScreenName());
+//        if(user.getAdditionalProperties() != null) {
+//            extensions.put("favorites", user.getAdditionalProperties().get("favourites_count"));
+//        }
+//
+//        Image profileImage = new Image();
+//        String profileUrl = null;
+//        if(actor.getImage() == null && user.getAdditionalProperties() != null) {
+//            Object url = user.getAdditionalProperties().get("profile_image_url_https");
+//            if(url instanceof String)
+//                profileUrl = (String) url;
+//        }
+//        if(actor.getImage() == null && profileUrl == null) {
+//            profileUrl = user.getProfileImageUrl();
+//        }
+//        profileImage.setUrl(profileUrl);
+//        actor.setImage(profileImage);
+//
+//        actor.setAdditionalProperty("extensions", extensions);
+//        return actor;
+//    }
 
     public void addLocationExtension(Activity activity, Twitter twitter) {
         Map<String, Object> extensions = ensureExtensions(activity);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7bf172a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
index e7b664a..8d7ff3a 100644
--- a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
+++ b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
@@ -947,62 +947,8 @@
                             "type": "string"
                         },
                         "user": {
-                            "javaType": "org.apache.streams.datasift.twitter.User",
-                            "type": "object",
-                            "dynamic": "true",
-                            "properties": {
-                                "created_at": {
-                                    "type": "string"
-                                },
-                                "description": {
-                                    "type": "string"
-                                },
-                                "followers_count": {
-                                    "type": "integer"
-                                },
-                                "friends_count": {
-                                    "type": "integer"
-                                },
-                                "geo_enabled": {
-                                    "type": "boolean"
-                                },
-                                "id": {
-                                    "type": "integer"
-                                },
-                                "id_str": {
-                                    "type": "string"
-                                },
-                                "lang": {
-                                    "type": "string"
-                                },
-                                "listed_count": {
-                                    "type": "integer"
-                                },
-                                "location": {
-                                    "type": "string"
-                                },
-                                "name": {
-                                    "type": "string"
-                                },
-                                "profile_image_url": {
-                                    "type": "string"
-                                },
-                                "screen_name": {
-                                    "type": "string"
-                                },
-                                "statuses_count": {
-                                    "type": "integer"
-                                },
-                                "time_zone": {
-                                    "type": "string"
-                                },
-                                "url": {
-                                    "type": "string"
-                                },
-                                "utc_offset": {
-                                    "type": "integer"
-                                }
-                            }
+                            "type": "DatasiftTwitterUser",
+                            "$ref": "./DatasiftTwitterUser.json"
                         }
                     }
                 },
@@ -1062,64 +1008,8 @@
                             "type": "string"
                         },
                         "user": {
-                            "javaType": "org.apache.streams.datasift.twitter.User",
-                            "dynamic": "true",
-                            "properties": {
-                                "created_at": {
-                                    "type": "string"
-                                },
-                                "description": {
-                                    "type": "string"
-                                },
-                                "followers_count": {
-                                    "type": "integer"
-                                },
-                                "friends_count": {
-                                    "type": "integer"
-                                },
-                                "geo_enabled": {
-                                    "type": "boolean"
-                                },
-                                "id": {
-                                    "type": "integer"
-                                },
-                                "id_str": {
-                                    "type": "string"
-                                },
-                                "lang": {
-                                    "type": "string"
-                                },
-                                "listed_count": {
-                                    "type": "integer"
-                                },
-                                "location": {
-                                    "type": "string"
-                                },
-                                "name": {
-                                    "type": "string"
-                                },
-                                "profile_image_url": {
-                                    "type": "string"
-                                },
-                                "screen_name": {
-                                    "type": "string"
-                                },
-                                "statuses_count": {
-                                    "type": "integer"
-                                },
-                                "time_zone": {
-                                    "type": "string"
-                                },
-                                "url": {
-                                    "type": "string"
-                                },
-                                "utc_offset": {
-                                    "type": "integer"
-                                },
-                                "verified": {
-                                    "type": "boolean"
-                                }
-                            }
+                            "type": "DatasiftTwitterUser",
+                            "$ref": "./DatasiftTwitterUser.json"
                         }
                     }
                 },
@@ -1130,65 +1020,8 @@
                     "type": "string"
                 },
                 "user": {
-                    "javaType": "org.apache.streams.datasift.interaction.User",
-                    "type": "object",
-                    "dynamic": "true",
-                    "properties": {
-                        "created_at": {
-                            "type": "string"
-                        },
-                        "description": {
-                            "type": "string"
-                        },
-                        "followers_count": {
-                            "type": "integer"
-                        },
-                        "friends_count": {
-                            "type": "integer"
-                        },
-                        "geo_enabled": {
-                            "type": "boolean"
-                        },
-                        "id": {
-                            "type": "integer"
-                        },
-                        "id_str": {
-                            "type": "string"
-                        },
-                        "lang": {
-                            "type": "string"
-                        },
-                        "listed_count": {
-                            "type": "integer"
-                        },
-                        "location": {
-                            "type": "string"
-                        },
-                        "name": {
-                            "type": "string"
-                        },
-                        "profile_image_url": {
-                            "type": "string"
-                        },
-                        "screen_name": {
-                            "type": "string"
-                        },
-                        "statuses_count": {
-                            "type": "integer"
-                        },
-                        "time_zone": {
-                            "type": "string"
-                        },
-                        "url": {
-                            "type": "string"
-                        },
-                        "utc_offset": {
-                            "type": "integer"
-                        },
-                        "verified": {
-                            "type": "boolean"
-                        }
-                    }
+                    "type": "DatasiftTwitterUser",
+                    "$ref": "./DatasiftTwitterUser.json"
                 }
             }
         },

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7bf172a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftTwitterUser.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftTwitterUser.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftTwitterUser.json
new file mode 100644
index 0000000..97d93fe
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftTwitterUser.json
@@ -0,0 +1,61 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "javaType": "org.apache.streams.datasift.twitter.DatasiftTwitterUser",
+    "properties": {
+        "created_at": {
+            "type": "string"
+        },
+        "description": {
+            "type": "string"
+        },
+        "followers_count": {
+            "type": "integer"
+        },
+        "friends_count": {
+            "type": "integer"
+        },
+        "geo_enabled": {
+            "type": "boolean"
+        },
+        "id": {
+            "type": "integer"
+        },
+        "id_str": {
+            "type": "string"
+        },
+        "lang": {
+            "type": "string"
+        },
+        "listed_count": {
+            "type": "integer"
+        },
+        "location": {
+            "type": "string"
+        },
+        "name": {
+            "type": "string"
+        },
+        "profile_image_url": {
+            "type": "string"
+        },
+        "screen_name": {
+            "type": "string"
+        },
+        "statuses_count": {
+            "type": "integer"
+        },
+        "time_zone": {
+            "type": "string"
+        },
+        "url": {
+            "type": "string"
+        },
+        "utc_offset": {
+            "type": "integer"
+        },
+        "verified": {
+            "type": "boolean"
+        }
+    }
+}
\ No newline at end of file


[06/18] git commit: Merge pull request #6 from apache/master

Posted by sb...@apache.org.
Merge pull request #6 from apache/master

Merge Apache

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/3b98df84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/3b98df84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/3b98df84

Branch: refs/heads/STREAMS-134
Commit: 3b98df844ca87e79bf9cb88f16fbad361d9959ad
Parents: d71b159 ba96627
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Tue Jul 1 09:56:26 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Tue Jul 1 09:56:26 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchConfigurator.java              |   4 +
 .../ElasticsearchPersistUpdater.java            |  35 -
 .../ElasticsearchPersistWriter.java             | 705 +++++++------------
 .../ElasticsearchPersistWriterTask.java         |  56 --
 .../ElasticsearchWriterConfiguration.json       |   5 +
 .../provider/SysomosHeartbeatStream.java        |   2 +-
 .../local/tasks/StreamsProviderTask.java        |  51 +-
 .../local/builders/LocalStreamBuilderTest.java  |  52 +-
 .../local/test/processors/SlowProcessor.java    |  50 ++
 .../test/providers/EmptyResultSetProvider.java  |   2 +-
 .../test/providers/NumericMessageProvider.java  |  63 +-
 .../test/component/FileReaderProvider.java      |  52 +-
 12 files changed, 419 insertions(+), 658 deletions(-)
----------------------------------------------------------------------



[14/18] git commit: Merge PR50 for 'rbnks/STREAMS-127'

Posted by sb...@apache.org.
Merge PR50 for  'rbnks/STREAMS-127'


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/853ae3c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/853ae3c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/853ae3c7

Branch: refs/heads/STREAMS-134
Commit: 853ae3c7ae708fecea790556ac3bfb6bc7078117
Parents: 38ed41a 62aab45
Author: mfranklin <mf...@apache.org>
Authored: Mon Jul 14 12:35:28 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jul 14 12:35:28 2014 -0400

----------------------------------------------------------------------
 .../DatasiftTweetActivitySerializer.java        |  51 +-----
 .../main/jsonschema/com/datasift/Datasift.json  | 179 +------------------
 .../com/datasift/DatasiftTwitterUser.json       |  61 +++++++
 3 files changed, 70 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/853ae3c7/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --cc streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
index 05b579b,7b9ccb3..443aeec
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
@@@ -82,48 -79,11 +82,48 @@@ public class DatasiftTweetActivitySeria
          return activity;
      }
  
 +    /**
 +     * Get the links from this tweet as a list
 +     * @param twitter
 +     * @return the links from the tweet
 +     */
 +    public List<String> getLinks(Twitter twitter) {
 +        return getLinks(twitter.getLinks());
 +    }
 +
 +    /**
 +     * Get the links from this tweet as a list
 +     * @param retweet
 +     * @return the links from the tweet
 +     */
 +    public List<String> getLinks(Retweet retweet) {
 +        return getLinks(retweet.getLinks());
 +    }
 +
 +    /**
 +     * Converts the list of objects to a list of strings
 +     * @param links
 +     * @return
 +     */
 +    private List<String> getLinks(List<Object> links) {
 +        if(links == null)
 +            return Lists.newArrayList();
 +        List<String> result = Lists.newLinkedList();
 +        for(Object obj : links) {
 +            if(obj instanceof String) {
 +                result.add((String) obj);
 +            } else {
 +                LOGGER.warn("Links is not instance of String : {}", obj.getClass().getName());
 +            }
 +        }
 +        return result;
 +    }
 +
      public Actor buildActor(Datasift event, Twitter twitter) {
-         User user = twitter.getUser();
+         DatasiftTwitterUser user = twitter.getUser();
          Actor actor = super.buildActor(event.getInteraction());
          if(user == null) {
-             return retweetBuildActor(actor, twitter.getRetweet().getUser());
+             user = twitter.getRetweet().getUser();
          }
  
          actor.setDisplayName(user.getName());


[05/18] git commit: STREAMS-105 | Code review feedback

Posted by sb...@apache.org.
STREAMS-105 | Code review feedback


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/949b6768
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/949b6768
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/949b6768

Branch: refs/heads/STREAMS-134
Commit: 949b676897aa87bba92613b694c4808efdea8c56
Parents: ce6e327
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Mon Jun 30 17:13:35 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Mon Jun 30 17:13:35 2014 -0500

----------------------------------------------------------------------
 .../DatasiftTweetActivitySerializer.java        | 29 ++++++--------------
 1 file changed, 8 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/949b6768/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
index a9514ed..1fecc4b 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
@@ -231,36 +231,23 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
         ArrayList<Long> mentionIds = (ArrayList<Long>) interaction.getAdditionalProperties().get("mention_ids");
         ArrayList<UserMentions> userMentions = new ArrayList<UserMentions>();
 
-        if(mentions != null && mentionIds != null && (mentionIds.size() == mentions.size()) && !mentions.isEmpty() && !mentionIds.isEmpty()) {
+        if(mentions != null && !mentions.isEmpty()) {
             for(int x = 0; x < mentions.size(); x ++) {
                 UserMentions mention = new UserMentions();
-
-                mention.setIdStr("id:twitter:" + mentionIds.get(x));
-                mention.setId(Long.parseLong(String.valueOf(mentionIds.get(x))));
                 mention.setName(mentions.get(x));
                 mention.setScreenName(mentions.get(x));
 
                 userMentions.add(mention);
             }
-        } else if((mentions != null && !mentions.isEmpty()) || (mentionIds != null && !mentionIds.isEmpty())) {
-            if(mentions != null && !mentions.isEmpty()) {
-                for(int x = 0; x < mentions.size(); x ++) {
-                    UserMentions mention = new UserMentions();
-                    mention.setName(mentions.get(x));
-                    mention.setScreenName(mentions.get(x));
-
-                    userMentions.add(mention);
-                }
-            }
-            if(mentionIds != null && !mentionIds.isEmpty()) {
-                for(int x = 0; x < mentionIds.size(); x ++) {
-                    UserMentions mention = new UserMentions();
+        }
+        if(mentionIds != null && !mentionIds.isEmpty()) {
+            for(int x = 0; x < mentionIds.size(); x ++) {
+                UserMentions mention = new UserMentions();
 
-                    mention.setIdStr("id:twitter:" + mentionIds.get(x));
-                    mention.setId(Long.parseLong(String.valueOf(mentionIds.get(x))));
+                mention.setIdStr("id:twitter:" + mentionIds.get(x));
+                mention.setId(Long.parseLong(String.valueOf(mentionIds.get(x))));
 
-                    userMentions.add(mention);
-                }
+                userMentions.add(mention);
             }
         }
 


[03/18] git commit: STREAMS-105 | Ensured that the user_mentions entries are correctly structured and that the activity object has the correct ID and content

Posted by sb...@apache.org.
STREAMS-105 | Ensured that the user_mentions entries are correctly structured and that the activity object has the correct ID and content


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c35c2ef7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c35c2ef7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c35c2ef7

Branch: refs/heads/STREAMS-134
Commit: c35c2ef74df06779d50bdf80a79823ade57ac1c4
Parents: d71b159
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Fri Jun 27 16:46:07 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Fri Jun 27 16:46:07 2014 -0500

----------------------------------------------------------------------
 .../DatasiftDefaultActivitySerializer.java      |  7 +++--
 .../DatasiftTweetActivitySerializer.java        | 30 ++++++++++++--------
 2 files changed, 23 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c35c2ef7/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
index b8c87e5..09126dc 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
@@ -22,8 +22,8 @@ import java.util.Map;
 import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
 
 /**
-*
-*/
+ *
+ */
 public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Datasift>, Serializable {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftDefaultActivitySerializer.class);
@@ -174,6 +174,9 @@ public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Dat
         ActivityObject actObj = new ActivityObject();
         actObj.setObjectType(interaction.getContenttype());
         actObj.setUrl(interaction.getLink());
+        actObj.setId(interaction.getId());
+        actObj.setContent(interaction.getContent());
+
         return actObj;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c35c2ef7/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
index 3406eb4..465196f 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
@@ -31,10 +31,12 @@ import org.apache.streams.datasift.twitter.Twitter;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.Actor;
 import org.apache.streams.pojo.json.Image;
+import org.apache.streams.twitter.pojo.UserMentions;
 import org.apache.streams.twitter.serializer.util.TwitterActivityUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -204,8 +206,8 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
                 }
             }
         }
-        extensions.put("hashtags", hashTags);
 
+        extensions.put("hashtags", hashTags);
 
         if(retweet != null) {
             Map<String, Object> rebroadcasts = Maps.newHashMap();
@@ -215,18 +217,22 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
         }
 
         if(interaction.getAdditionalProperties() != null) {
-            Object mentionsObject = interaction.getAdditionalProperties().get("mentions");
-            if(mentionsObject != null ) {
-                if(mentionsObject instanceof List) {
-                    List mentions = (List) mentionsObject;
-                    List<Map<String, Object>> userMentions = Lists.newLinkedList();
-                    for(Object mention : mentions) {
-                        Map<String, Object> actor = Maps.newHashMap();
-                        actor.put("displayName", mention);
-                        userMentions.add(actor);
-                    }
-                    extensions.put("user_mentions", userMentions);
+            ArrayList<String> mentions = (ArrayList<String>) interaction.getAdditionalProperties().get("mentions");
+            ArrayList<Long> mentionIds = (ArrayList<Long>) interaction.getAdditionalProperties().get("mention_ids");
+            ArrayList<UserMentions> userMentions = new ArrayList<UserMentions>();
+
+            if(mentions != null && mentionIds != null && (mentionIds.size() == mentions.size()) && !mentions.isEmpty() && !mentionIds.isEmpty()) {
+                for(int x = 0; x < mentions.size(); x ++) {
+                    UserMentions mention = new UserMentions();
+
+                    mention.setIdStr("id:twitter:" + mentionIds.get(x));
+                    mention.setId(mentionIds.get(x));
+                    mention.setName(mentions.get(x));
+                    mention.setScreenName(mentions.get(x));
+
+                    userMentions.add(mention);
                 }
+                extensions.put("user_mentions", userMentions);
             }
         }
         extensions.put("keywords", interaction.getContent());


[15/18] git commit: Merge PR47 branch 'rdouglas/STREAMS-105'

Posted by sb...@apache.org.
Merge PR47 branch 'rdouglas/STREAMS-105'


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b7e4c346
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b7e4c346
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b7e4c346

Branch: refs/heads/STREAMS-134
Commit: b7e4c34672238bbcd5a28cede7c4965e4e9ab49c
Parents: 853ae3c fbb8ccf
Author: mfranklin <mf...@apache.org>
Authored: Mon Jul 14 13:01:41 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jul 14 13:01:41 2014 -0400

----------------------------------------------------------------------
 .../serializer/DatasiftDefaultActivitySerializer.java         | 7 +++++--
 .../src/main/jsonschema/com/datasift/Datasift.json            | 2 +-
 2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7e4c346/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
----------------------------------------------------------------------


[04/18] git commit: STREAMS-105 | Code review feedback

Posted by sb...@apache.org.
STREAMS-105 | Code review feedback


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ce6e3276
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ce6e3276
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ce6e3276

Branch: refs/heads/STREAMS-134
Commit: ce6e3276e113251b15c4655335a6887082e13315
Parents: c35c2ef
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Mon Jun 30 11:38:32 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Mon Jun 30 11:38:32 2014 -0500

----------------------------------------------------------------------
 .../DatasiftTweetActivitySerializer.java        | 141 +++++++++++--------
 .../main/jsonschema/com/datasift/Datasift.json  |   2 +-
 2 files changed, 86 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6e3276/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
index 465196f..a9514ed 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
@@ -81,58 +81,32 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
         return activity;
     }
 
+    /**
+     * Returns the actor created from this particular event and Twitter object
+     *
+     * @param event
+     * @param twitter
+     * @return
+     */
     public Actor buildActor(Datasift event, Twitter twitter) {
         User user = twitter.getUser();
         Actor actor = super.buildActor(event.getInteraction());
-        if(user == null) {
-            return retweetBuildActor(actor, twitter.getRetweet().getUser());
-        }
-
-        actor.setDisplayName(user.getName());
-        actor.setId(formatId(Optional.fromNullable(
-                user.getIdStr())
-                .or(Optional.of(user.getId().toString()))
-                .orNull()));
-        actor.setSummary(user.getDescription());
-        try {
-            actor.setPublished(RFC3339Utils.parseToUTC(user.getCreatedAt()));
-        } catch (Exception e) {
-            LOGGER.warn("Exception trying to parse date : {}", e);
-        }
 
-        if(user.getUrl() != null) {
-            actor.setUrl(user.getUrl());
-        }
-
-        Map<String, Object> extensions = new HashMap<String,Object>();
-        extensions.put("location", user.getLocation());
-        extensions.put("posts", user.getStatusesCount());
-        extensions.put("followers", user.getFollowersCount());
-        extensions.put("screenName", user.getScreenName());
-        if(user.getAdditionalProperties() != null) {
-            extensions.put("favorites", user.getAdditionalProperties().get("favourites_count"));
-        }
-
-        Image profileImage = new Image();
-        String profileUrl = null;
-        profileUrl = event.getInteraction().getAuthor().getAvatar();
-        if(profileUrl == null && user.getAdditionalProperties() != null) {
-            Object url = user.getAdditionalProperties().get("profile_image_url_https");
-            if(url instanceof String)
-                profileUrl = (String) url;
-        }
-        if(profileUrl == null) {
-            profileUrl = user.getProfileImageUrl();
+        if (user == null) {
+            return userBuildActor(actor, twitter.getRetweet().getUser());
+        } else {
+            return userBuildActor(actor, user);
         }
-        profileImage.setUrl(profileUrl);
-        actor.setImage(profileImage);
-
-        actor.setAdditionalProperty("extensions", extensions);
-        return actor;
     }
 
-    //Need to make retweet user and tweet user the same object.
-    public Actor retweetBuildActor(Actor actor, org.apache.streams.datasift.twitter.User user) {
+    /**
+     * Build an actor object given a User
+     *
+     * @param actor
+     * @param user
+     * @return
+     */
+    private Actor userBuildActor(Actor actor, User user) {
 
         actor.setDisplayName(user.getName());
         actor.setId(formatId(Optional.fromNullable(
@@ -169,13 +143,22 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
         if(actor.getImage() == null && profileUrl == null) {
             profileUrl = user.getProfileImageUrl();
         }
-        profileImage.setUrl(profileUrl);
-        actor.setImage(profileImage);
+
+        if(profileUrl != null) {
+            profileImage.setUrl(profileUrl);
+            actor.setImage(profileImage);
+        }
 
         actor.setAdditionalProperty("extensions", extensions);
         return actor;
     }
 
+    /**
+     * Adds location attributes from the given Twitter object
+     *
+     * @param activity
+     * @param twitter
+     */
     public void addLocationExtension(Activity activity, Twitter twitter) {
         Map<String, Object> extensions = ensureExtensions(activity);
         Map<String, Object> location = Maps.newHashMap();
@@ -187,6 +170,13 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
         extensions.put("location", location);
     }
 
+    /**
+     * Creates and adds Twitter Extensions from given interaction
+     *
+     * @param activity
+     * @param twitter
+     * @param interaction
+     */
     public void addTwitterExtensions(Activity activity, Twitter twitter, Interaction interaction) {
         Retweet retweet = twitter.getRetweet();
         Map<String, Object> extensions = ensureExtensions(activity);
@@ -217,24 +207,63 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
         }
 
         if(interaction.getAdditionalProperties() != null) {
-            ArrayList<String> mentions = (ArrayList<String>) interaction.getAdditionalProperties().get("mentions");
-            ArrayList<Long> mentionIds = (ArrayList<Long>) interaction.getAdditionalProperties().get("mention_ids");
-            ArrayList<UserMentions> userMentions = new ArrayList<UserMentions>();
+            ArrayList<UserMentions> userMentions = createUserMentions(interaction);
+
+            if(userMentions.size() > 0)
+                extensions.put("user_mentions", userMentions);
+        }
+
+        extensions.put("keywords", interaction.getContent());
+    }
+
+    /**
+     * Returns an ArrayList of all UserMentions in this interaction
+     * Note: The ID list and the handle lists do not necessarily correspond 1:1 for this provider
+     * If those lists are the same size, then they will be merged into individual UserMention
+     * objects. However, if they are not the same size, a new UserMention object will be created
+     * for each entry in both lists.
+     *
+     * @param interaction
+     * @return
+     */
+    private ArrayList<UserMentions> createUserMentions(Interaction interaction) {
+        ArrayList<String> mentions = (ArrayList<String>) interaction.getAdditionalProperties().get("mentions");
+        ArrayList<Long> mentionIds = (ArrayList<Long>) interaction.getAdditionalProperties().get("mention_ids");
+        ArrayList<UserMentions> userMentions = new ArrayList<UserMentions>();
 
-            if(mentions != null && mentionIds != null && (mentionIds.size() == mentions.size()) && !mentions.isEmpty() && !mentionIds.isEmpty()) {
+        if(mentions != null && mentionIds != null && (mentionIds.size() == mentions.size()) && !mentions.isEmpty() && !mentionIds.isEmpty()) {
+            for(int x = 0; x < mentions.size(); x ++) {
+                UserMentions mention = new UserMentions();
+
+                mention.setIdStr("id:twitter:" + mentionIds.get(x));
+                mention.setId(Long.parseLong(String.valueOf(mentionIds.get(x))));
+                mention.setName(mentions.get(x));
+                mention.setScreenName(mentions.get(x));
+
+                userMentions.add(mention);
+            }
+        } else if((mentions != null && !mentions.isEmpty()) || (mentionIds != null && !mentionIds.isEmpty())) {
+            if(mentions != null && !mentions.isEmpty()) {
                 for(int x = 0; x < mentions.size(); x ++) {
                     UserMentions mention = new UserMentions();
-
-                    mention.setIdStr("id:twitter:" + mentionIds.get(x));
-                    mention.setId(mentionIds.get(x));
                     mention.setName(mentions.get(x));
                     mention.setScreenName(mentions.get(x));
 
                     userMentions.add(mention);
                 }
-                extensions.put("user_mentions", userMentions);
+            }
+            if(mentionIds != null && !mentionIds.isEmpty()) {
+                for(int x = 0; x < mentionIds.size(); x ++) {
+                    UserMentions mention = new UserMentions();
+
+                    mention.setIdStr("id:twitter:" + mentionIds.get(x));
+                    mention.setId(Long.parseLong(String.valueOf(mentionIds.get(x))));
+
+                    userMentions.add(mention);
+                }
             }
         }
-        extensions.put("keywords", interaction.getContent());
+
+        return userMentions;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6e3276/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
index e7b664a..c30fa8f 100644
--- a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
+++ b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
@@ -738,7 +738,7 @@
                     "type": "array",
                     "items": [
                         {
-                            "type": "integer"
+                            "type": "string"
                         }
                     ]
                 },


[17/18] git commit: percolate processor - tested with streams-examples/elasticsearch-tag

Posted by sb...@apache.org.
percolate processor - tested with streams-examples/elasticsearch-tag


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7df265f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7df265f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7df265f5

Branch: refs/heads/STREAMS-134
Commit: 7df265f5436d167c0d80d3205b822f67c904bac1
Parents: b3d849d
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Tue Jul 15 20:57:28 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Tue Jul 15 20:57:40 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchConfigurator.java              |  28 +-
 .../ElasticsearchPersistUpdater.java            |   6 +-
 .../elasticsearch/PercolateProcessor.java       | 294 ---------------
 .../processor/PercolateTagProcessor.java        | 353 +++++++++++++++++++
 .../ElasticsearchWriterConfiguration.json       |  22 +-
 streams-pojo/pom.xml                            |   6 +
 .../org/apache/streams/data/util/JsonUtil.java  |  60 +++-
 7 files changed, 424 insertions(+), 345 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
index 26033de..1c66789 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
@@ -78,28 +78,14 @@ public class ElasticsearchConfigurator {
 
     public static ElasticsearchWriterConfiguration detectWriterConfiguration(Config elasticsearch) {
 
-        ElasticsearchConfiguration elasticsearchConfiguration = detectConfiguration(elasticsearch);
-        ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = mapper.convertValue(elasticsearchConfiguration, ElasticsearchWriterConfiguration.class);
-
-        String index = elasticsearch.getString("index");
-        String type = elasticsearch.getString("type");
-        Long maxMsBeforeFlush = elasticsearch.hasPath("MaxTimeBetweenFlushMs") ? elasticsearch.getLong("MaxTimeBetweenFlushMs") : null;
-
-        if( elasticsearch.hasPath("bulk"))
-            elasticsearchWriterConfiguration.setBulk(elasticsearch.getBoolean("bulk"));
-
-        if( elasticsearch.hasPath("batchSize"))
-            elasticsearchWriterConfiguration.setBatchSize(elasticsearch.getLong("batchSize"));
-
-        if( elasticsearch.hasPath("batchBytes"))
-            elasticsearchWriterConfiguration.setBatchBytes(elasticsearch.getLong("batchBytes"));
-
-
-        elasticsearchWriterConfiguration.setIndex(index);
-        elasticsearchWriterConfiguration.setType(type);
-        elasticsearchWriterConfiguration.setMaxTimeBetweenFlushMs(maxMsBeforeFlush);
-
+        ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = null;
 
+        try {
+            elasticsearchWriterConfiguration = mapper.readValue(elasticsearch.root().render(ConfigRenderOptions.concise()), ElasticsearchWriterConfiguration.class);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Could not parse elasticsearchwriterconfiguration");
+        }
         return elasticsearchWriterConfiguration;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index b2e7556..602172e 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -128,14 +128,14 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
 
     private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    private ElasticsearchConfiguration config;
+    private ElasticsearchWriterConfiguration config;
 
     public ElasticsearchPersistUpdater() {
         Config config = StreamsConfigurator.config.getConfig("elasticsearch");
-        this.config = ElasticsearchConfigurator.detectConfiguration(config);
+        this.config = ElasticsearchConfigurator.detectWriterConfiguration(config);
     }
 
-    public ElasticsearchPersistUpdater(ElasticsearchConfiguration config) {
+    public ElasticsearchPersistUpdater(ElasticsearchWriterConfiguration config) {
         this.config = config;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
deleted file mode 100644
index c5d9f4c..0000000
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.elasticsearch;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.data.util.ActivityUtil;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.percolate.PercolateResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.index.query.QueryStringQueryBuilder;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-/**
- * References:
- * Some helpful references to help
- * Purpose              URL
- * -------------        ----------------------------------------------------------------
- * [Status Codes]       http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
- * [Test Cases]         http://greenbytes.de/tech/tc/httpredirects/
- * [t.co behavior]      https://dev.twitter.com/docs/tco-redirection-behavior
- */
-
-public class PercolateProcessor implements StreamsProcessor {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(PercolateProcessor.class);
-
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-    protected Queue<StreamsDatum> inQueue;
-    protected Queue<StreamsDatum> outQueue;
-
-    public String TAGS_EXTENSION = "tags";
-
-    private ElasticsearchWriterConfiguration config;
-    private ElasticsearchClientManager manager;
-    private BulkRequestBuilder bulkBuilder;
-
-    public PercolateProcessor(ElasticsearchConfiguration config) {
-        manager = new ElasticsearchClientManager(config);
-    }
-
-    public ElasticsearchClientManager getManager() {
-        return manager;
-    }
-
-    public void setManager(ElasticsearchClientManager manager) {
-        this.manager = manager;
-    }
-
-    public ElasticsearchConfiguration getConfig() {
-        return config;
-    }
-
-    public void setConfig(ElasticsearchWriterConfiguration config) {
-        this.config = config;
-    }
-
-    public Queue<StreamsDatum> getProcessorOutputQueue() {
-        return outQueue;
-    }
-
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-
-        List<StreamsDatum> result = Lists.newArrayList();
-
-        String json;
-        ObjectNode node;
-        // first check for valid json
-        if (entry.getDocument() instanceof String) {
-            json = (String) entry.getDocument();
-            try {
-                node = (ObjectNode) mapper.readTree(json);
-            } catch (IOException e) {
-                e.printStackTrace();
-                return null;
-            }
-        } else {
-            node = (ObjectNode) entry.getDocument();
-            json = node.asText();
-        }
-
-        PercolateResponse response = manager.getClient().preparePercolate().setSource(json).execute().actionGet();
-
-        ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
-
-        for (PercolateResponse.Match match : response.getMatches()) {
-            tagArray.add(match.getId().string());
-        }
-
-        Activity activity = mapper.convertValue(node, Activity.class);
-
-        Map<String, Object> extensions = ActivityUtil.ensureExtensions(activity);
-
-        extensions.put(TAGS_EXTENSION, tagArray);
-
-        activity.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
-
-        result.add(entry);
-
-        return result;
-
-    }
-
-    @Override
-    public void prepare(Object o) {
-
-        Preconditions.checkNotNull(manager);
-        Preconditions.checkNotNull(manager.getClient());
-        Preconditions.checkNotNull(config);
-        Preconditions.checkNotNull(config.getTags());
-        Preconditions.checkArgument(config.getTags().size() > 0);
-
-        // consider using mapping to figure out what fields are included in _all
-        //manager.getClient().admin().indices().prepareGetMappings(config.getIndex()).get().getMappings().get(config.getType()).;
-
-        deleteOldQueries(config.getIndex());
-        for (Tag tag : config.getTags()) {
-            PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag.getId());
-            queryBuilder.addQuery(tag.getQuery(), FilterLevel.MUST, "_all");
-            addPercolateRule(queryBuilder, config.getIndex());
-        }
-        if (writePercolateRules() == true)
-            LOGGER.info("wrote " + bulkBuilder.numberOfActions() + " tags to _percolator");
-        else
-            LOGGER.error("FAILED writing " + bulkBuilder.numberOfActions() + " tags to _percolator");
-
-
-    }
-
-    @Override
-    public void cleanUp() {
-        manager.getClient().close();
-    }
-
-    public int numOfPercolateRules() {
-        return this.bulkBuilder.numberOfActions();
-    }
-
-    public void addPercolateRule(PercolateQueryBuilder builder, String index) {
-        this.bulkBuilder.add(manager.getClient().prepareIndex("_percolator", index, builder.getId()).setSource(builder.getSource()));
-    }
-
-    /**
-     *
-     * @return returns true if all rules were addded. False indicates one or more rules have failed.
-     */
-    public boolean writePercolateRules() {
-        if(this.numOfPercolateRules() < 0) {
-            throw new RuntimeException("No Rules Have been added!");
-        }
-        BulkResponse response = this.bulkBuilder.execute().actionGet();
-        for(BulkItemResponse r : response.getItems()) {
-            if(r.isFailed()) {
-                System.out.println(r.getId()+"\t"+r.getFailureMessage());
-            }
-        }
-        return !response.hasFailures();
-    }
-
-    /**
-     *
-     * @param ids
-     * @param index
-     * @return  Returns true if all of the old tags were removed. False indicates one or more tags were not removed.
-     */
-    public boolean removeOldTags(Set<String> ids, String index) {
-        if(ids.size() == 0) {
-            return false;
-        }
-        BulkRequestBuilder bulk = manager.getClient().prepareBulk();
-        for(String id : ids) {
-            bulk.add(manager.getClient().prepareDelete("_percolator", index, id));
-        }
-        return !bulk.execute().actionGet().hasFailures();
-    }
-
-    public Set<String> getActivePercolateTags(String index) {
-        Set<String> tags = new HashSet<String>();
-        SearchRequestBuilder searchBuilder = manager.getClient().prepareSearch("_percolator").setTypes(index).setSize(1000);
-        SearchResponse response = searchBuilder.setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
-        SearchHits hits = response.getHits();
-        for(SearchHit hit : hits.getHits()) {
-            tags.add(hit.id());
-        }
-        return tags;
-    }
-
-    /**
-     *
-     * @param index
-     * @return
-     */
-    public boolean deleteOldQueries(String index) {
-        Set<String> tags = getActivePercolateTags(index);
-        if(tags.size() == 0) {
-            LOGGER.warn("No active tags were found in _percolator for index : {}", index);
-            return false;
-        }
-        LOGGER.info("Deleting {} tags.", tags.size());
-        BulkRequestBuilder bulk = manager.getClient().prepareBulk();
-        for(String tag : tags) {
-            bulk.add(manager.getClient().prepareDelete("_percolator", index, tag));
-        }
-        BulkResponse response =bulk.execute().actionGet();
-        return !response.hasFailures();
-    }
-
-    public static class PercolateQueryBuilder {
-
-        private BoolQueryBuilder queryBuilder;
-        private String id;
-
-        public PercolateQueryBuilder(String id) {
-            this.id = id;
-            this.queryBuilder = QueryBuilders.boolQuery();
-        }
-
-        public void setMinumumNumberShouldMatch(int shouldMatch) {
-            this.queryBuilder.minimumNumberShouldMatch(shouldMatch);
-        }
-
-
-        public void addQuery(String query, FilterLevel level, String... fields) {
-            QueryStringQueryBuilder builder = QueryBuilders.queryString(query);
-            if(fields != null && fields.length > 0) {
-                for(String field : fields) {
-                    builder.field(field);
-                }
-            }
-            switch (level) {
-                case MUST:
-                    this.queryBuilder.must(builder);
-                    break;
-                case SHOULD:
-                    this.queryBuilder.should(builder);
-                    break;
-                case MUST_NOT:
-                    this.queryBuilder.mustNot(builder);
-            }
-        }
-
-        public String getId() {
-            return this.id;
-        }
-
-        public String getSource() {
-            return "{ \n\"query\" : "+this.queryBuilder.toString()+"\n}";
-        }
-
-
-    }
-
-    public enum FilterLevel {
-        MUST, SHOULD, MUST_NOT
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
new file mode 100644
index 0000000..075d2b2
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.elasticsearch.processor;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.percolate.PercolateRequestBuilder;
+import org.elasticsearch.action.percolate.PercolateResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.QueryStringQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * References:
+ * Some helpful references to help
+ * Purpose              URL
+ * -------------        ----------------------------------------------------------------
+ * [Status Codes]       http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
+ * [Test Cases]         http://greenbytes.de/tech/tc/httpredirects/
+ * [t.co behavior]      https://dev.twitter.com/docs/tco-redirection-behavior
+ */
+
+public class PercolateTagProcessor implements StreamsProcessor {
+
+    public static final String STREAMS_ID = "PercolateTagProcessor";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(PercolateTagProcessor.class);
+
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    protected Queue<StreamsDatum> inQueue;
+    protected Queue<StreamsDatum> outQueue;
+
+    public String TAGS_EXTENSION = "tags";
+
+    private ElasticsearchWriterConfiguration config;
+    private ElasticsearchClientManager manager;
+    private BulkRequestBuilder bulkBuilder;
+
+    public PercolateTagProcessor(ElasticsearchWriterConfiguration config) {
+        this.config = config;
+        manager = new ElasticsearchClientManager(config);
+    }
+
+    public ElasticsearchClientManager getManager() {
+        return manager;
+    }
+
+    public void setManager(ElasticsearchClientManager manager) {
+        this.manager = manager;
+    }
+
+    public ElasticsearchConfiguration getConfig() {
+        return config;
+    }
+
+    public void setConfig(ElasticsearchWriterConfiguration config) {
+        this.config = config;
+    }
+
+    public Queue<StreamsDatum> getProcessorOutputQueue() {
+        return outQueue;
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        List<StreamsDatum> result = Lists.newArrayList();
+
+        String json;
+        ObjectNode node;
+        // first check for valid json
+        if (entry.getDocument() instanceof String) {
+            json = (String) entry.getDocument();
+            try {
+                node = (ObjectNode) mapper.readTree(json);
+            } catch (IOException e) {
+                e.printStackTrace();
+                return null;
+            }
+        } else {
+            node = (ObjectNode) entry.getDocument();
+            try {
+                json = mapper.writeValueAsString(node);
+            } catch (JsonProcessingException e) {
+                LOGGER.warn("Invalid datum: ", node);
+                return null;
+            }
+        }
+
+        StringBuilder percolateRequestJson = new StringBuilder();
+        percolateRequestJson.append("{ \"doc\": ");
+        percolateRequestJson.append(json);
+        //percolateRequestJson.append("{ \"content\" : \"crazy good shit\" }");
+        percolateRequestJson.append("}");
+
+        PercolateRequestBuilder request;
+        PercolateResponse response;
+
+        try {
+            LOGGER.trace("Percolate request json: {}", percolateRequestJson.toString());
+            request = manager.getClient().preparePercolate().setIndices(config.getIndex()).setDocumentType(config.getType()).setSource(percolateRequestJson.toString());
+            LOGGER.trace("Percolate request: {}", mapper.writeValueAsString(request.request()));
+            response = request.execute().actionGet();
+            LOGGER.trace("Percolate response: {} matches", response.getMatches().length);
+        } catch (Exception e) {
+            LOGGER.warn("Percolate exception: {}", e.getMessage());
+            return null;
+        }
+
+        ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
+
+        Iterator<PercolateResponse.Match> matchIterator = response.iterator();
+        while(matchIterator.hasNext()) {
+            tagArray.add(matchIterator.next().getId().string());
+        }
+
+        LOGGER.trace("Percolate matches: {}", tagArray);
+
+        Activity activity = mapper.convertValue(node, Activity.class);
+
+        Map<String, Object> extensions = ActivityUtil.ensureExtensions(activity);
+
+        extensions.put(TAGS_EXTENSION, tagArray);
+
+        activity.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
+
+        entry.setDocument(activity);
+
+        result.add(entry);
+
+        return result;
+
+    }
+
+    @Override
+    public void prepare(Object o) {
+
+        Preconditions.checkNotNull(manager);
+        Preconditions.checkNotNull(manager.getClient());
+        Preconditions.checkNotNull(config);
+        Preconditions.checkNotNull(config.getTags());
+        Preconditions.checkArgument(config.getTags().getAdditionalProperties().size() > 0);
+
+        // consider using mapping to figure out what fields are included in _all
+        //manager.getClient().admin().indices().prepareGetMappings(config.getIndex()).get().getMappings().get(config.getType()).;
+
+        //deleteOldQueries(config.getIndex());
+        bulkBuilder = manager.getClient().prepareBulk();
+        for (String tag : config.getTags().getAdditionalProperties().keySet()) {
+            String query = (String)config.getTags().getAdditionalProperties().get(tag);
+            PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag, query);
+            addPercolateRule(queryBuilder, config.getIndex());
+        }
+        if (writePercolateRules() == true)
+            LOGGER.info("wrote " + bulkBuilder.numberOfActions() + " tags to " + config.getIndex() + " _percolator");
+        else
+            LOGGER.error("FAILED writing " + bulkBuilder.numberOfActions() + " tags to " + config.getIndex() + " _percolator");
+
+
+    }
+
+    @Override
+    public void cleanUp() {
+        deleteOldQueries(config.getIndex());
+        manager.getClient().close();
+    }
+
+    public int numOfPercolateRules() {
+        return this.bulkBuilder.numberOfActions();
+    }
+
+    public void addPercolateRule(PercolateQueryBuilder builder, String index) {
+        this.bulkBuilder.add(manager.getClient().prepareIndex(index, ".percolator", builder.getId())
+                .setSource(builder.getSource()));
+    }
+
+    /**
+     *
+     * @return returns true if all rules were addded. False indicates one or more rules have failed.
+     */
+    public boolean writePercolateRules() {
+        if(this.numOfPercolateRules() < 0) {
+            throw new RuntimeException("No Rules Have been added!");
+        }
+        BulkResponse response = this.bulkBuilder.execute().actionGet();
+        for(BulkItemResponse r : response.getItems()) {
+            if(r.isFailed()) {
+                System.out.println(r.getId()+"\t"+r.getFailureMessage());
+            }
+        }
+        return !response.hasFailures();
+    }
+
+    /**
+     *
+     * @param ids
+     * @param index
+     * @return  Returns true if all of the old tags were removed. False indicates one or more tags were not removed.
+     */
+    public boolean removeOldTags(Set<String> ids, String index) {
+        if(ids.size() == 0) {
+            return false;
+        }
+        BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+        for(String id : ids) {
+            bulk.add(manager.getClient().prepareDelete("_percolator", index, id));
+        }
+        return !bulk.execute().actionGet().hasFailures();
+    }
+
+    public Set<String> getActivePercolateTags(String index) {
+        Set<String> tags = new HashSet<String>();
+        SearchRequestBuilder searchBuilder = manager.getClient().prepareSearch("*").setIndices(index).setTypes(".percolator").setSize(1000);
+        SearchResponse response = searchBuilder.setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
+        SearchHits hits = response.getHits();
+        for(SearchHit hit : hits.getHits()) {
+            tags.add(hit.id());
+        }
+        return tags;
+    }
+
+    /**
+     *
+     * @param index
+     * @return
+     */
+    public boolean deleteOldQueries(String index) {
+        Set<String> tags = getActivePercolateTags(index);
+        if(tags.size() == 0) {
+            LOGGER.warn("No active tags were found in _percolator for index : {}", index);
+            return false;
+        }
+        LOGGER.info("Deleting {} tags.", tags.size());
+        BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+        for(String tag : tags) {
+            bulk.add(manager.getClient().prepareDelete().setType(".percolator").setIndex(index).setId(tag));
+        }
+        BulkResponse response =bulk.execute().actionGet();
+        return !response.hasFailures();
+    }
+
+//    public static class PercolateQueryBuilder {
+//
+//        private BoolQueryBuilder queryBuilder;
+//        private String id;
+//
+//        public PercolateQueryBuilder(String id) {
+//            this.id = id;
+//            this.queryBuilder = QueryBuilders.boolQuery();
+//        }
+//
+//        public void setMinumumNumberShouldMatch(int shouldMatch) {
+//            this.queryBuilder.minimumNumberShouldMatch(shouldMatch);
+//        }
+//
+//
+//        public void addQuery(String query, FilterLevel level, String... fields) {
+//            QueryStringQueryBuilder builder = QueryBuilders.queryString(query);
+//            if(fields != null && fields.length > 0) {
+//                for(String field : fields) {
+//                    builder.field(field);
+//                }
+//            }
+//            switch (level) {
+//                case MUST:
+//                    this.queryBuilder.must(builder);
+//                    break;
+//                case SHOULD:
+//                    this.queryBuilder.should(builder);
+//                    break;
+//                case MUST_NOT:
+//                    this.queryBuilder.mustNot(builder);
+//            }
+//        }
+//
+//        public String getId() {
+//            return this.id;
+//        }
+//
+//        public String getSource() {
+//            return "{ \n\"query\" : "+this.queryBuilder.toString()+"\n}";
+//        }
+//
+//
+//    }
+
+    public static class PercolateQueryBuilder {
+
+        private QueryStringQueryBuilder queryBuilder;
+        private String id;
+
+        public PercolateQueryBuilder(String id, String query) {
+            this.id = id;
+            this.queryBuilder = QueryBuilders.queryString(query);
+        }
+
+        public String getId() {
+            return this.id;
+        }
+
+        public String getSource() {
+            return "{ \n\"query\" : "+this.queryBuilder.toString()+"\n}";
+        }
+
+    }
+
+
+    public enum FilterLevel {
+        MUST, SHOULD, MUST_NOT
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index bf7b146..cd23fe2 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -32,23 +32,13 @@
 		"maxTimeBetweenFlushMs": {
 			"type": "integer"
 		},
+        "script": {
+            "type": "string",
+            "description": "Script to execute during index"
+        },
         "tags": {
-            "type": "array",
-            "description": "Tags to apply",
-            "items": {
-                "type": "object",
-                "description": "Tag to apply",
-                "properties": {
-                    "id": {
-                        "type": "string",
-                        "description": "Tag identifier"
-                    },
-                    "query": {
-                        "type": "string",
-                        "description": "Tag query"
-                    }
-                }
-            }
+            "type": "object",
+            "description": "Tags to apply during index"
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-pojo/pom.xml
----------------------------------------------------------------------
diff --git a/streams-pojo/pom.xml b/streams-pojo/pom.xml
index a3a12f6..ca6d953 100644
--- a/streams-pojo/pom.xml
+++ b/streams-pojo/pom.xml
@@ -85,6 +85,12 @@
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-pojo/src/main/java/org/apache/streams/data/util/JsonUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/util/JsonUtil.java b/streams-pojo/src/main/java/org/apache/streams/data/util/JsonUtil.java
index d49ef2a..79ac555 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/util/JsonUtil.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/util/JsonUtil.java
@@ -24,9 +24,15 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 
 import java.io.*;
-import java.util.List;
+import java.util.*;
 
 /**
  * JSON utilities
@@ -35,10 +41,10 @@ public class JsonUtil {
 
     private JsonUtil() {}
 
-    public static JsonNode jsonToJsonNode(String json) {
-        ObjectMapper mapper = new ObjectMapper();
-        JsonFactory factory = mapper.getFactory();
+    private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+    private static JsonFactory factory = mapper.getFactory();
 
+    public static JsonNode jsonToJsonNode(String json) {
         JsonNode node;
         try {
             JsonParser jp = factory.createJsonParser(json);
@@ -50,7 +56,6 @@ public class JsonUtil {
     }
 
     public static String jsonNodeToJson(JsonNode node) {
-        ObjectMapper mapper = new ObjectMapper();
         try {
             return mapper.writeValueAsString(node);
         } catch (JsonProcessingException e) {
@@ -59,7 +64,6 @@ public class JsonUtil {
     }
 
     public static <T> T jsonToObject(String json, Class<T> clazz) {
-        ObjectMapper mapper = new ObjectMapper();
         try {
             return mapper.readValue(json, clazz);
         } catch (IOException e) {
@@ -68,22 +72,18 @@ public class JsonUtil {
     }
 
     public static <T> T jsonNodeToObject(JsonNode node, Class<T> clazz) {
-        ObjectMapper mapper = new ObjectMapper();
         return mapper.convertValue(node, clazz);
     }
 
     public static <T> JsonNode objectToJsonNode(T obj) {
-        ObjectMapper mapper = new ObjectMapper();
         return mapper.valueToTree(obj);
     }
 
     public static <T> List<T> jsoNodeToList(JsonNode node, Class<T> clazz) {
-        ObjectMapper mapper = new ObjectMapper();
         return mapper.convertValue(node, new TypeReference<List<T>>() {});
     }
 
     public static <T> String objectToJson(T object) {
-        ObjectMapper mapper = new ObjectMapper();
         try {
             return mapper.writeValueAsString(object);
         } catch (IOException e) {
@@ -96,7 +96,6 @@ public class JsonUtil {
     }
 
     public static JsonNode getFromFile(String filePath) {
-        ObjectMapper mapper = new ObjectMapper();
         JsonFactory factory = mapper.getFactory(); // since 2.1 use mapper.getFactory() instead
 
         JsonNode node = null;
@@ -123,4 +122,43 @@ public class JsonUtil {
 
         return stream;
     }
+
+    /**
+     * Creates an empty array if missing
+     * @param node object to create the array within
+     * @param field location to create the array
+     * @return the Map representing the extensions property
+     */
+    public static ArrayNode ensureArray(ObjectNode node, String field) {
+        String[] path = Lists.newArrayList(Splitter.on('.').split(field)).toArray(new String[0]);
+        ObjectNode current = node;
+        ArrayNode result = null;
+        for( int i = 0; i < path.length; i++) {
+            current = ensureObject((ObjectNode) node.get(path[i]), path[i]);
+        }
+        if (current.get(field) == null)
+            current.put(field, mapper.createArrayNode());
+        result = (ArrayNode) node.get(field);
+        return result;
+    }
+
+    /**
+     * Creates an empty array if missing
+     * @param node objectnode to create the object within
+     * @param field location to create the object
+     * @return the Map representing the extensions property
+     */
+    public static ObjectNode ensureObject(ObjectNode node, String field) {
+        String[] path = Lists.newArrayList(Splitter.on('.').split(field)).toArray(new String[0]);
+        ObjectNode current = node;
+        ObjectNode result = null;
+        for( int i = 0; i < path.length; i++) {
+            if (node.get(field) == null)
+                node.put(field, mapper.createObjectNode());
+            current = (ObjectNode) node.get(field);
+        }
+        result = ensureObject((ObjectNode) node.get(path[path.length]), Joiner.on('.').join(Arrays.copyOfRange(path, 1, path.length)));
+        return result;
+    }
+
 }


[18/18] git commit: Merge branch 'STREAMS-134' of https://git-wip-us.apache.org/repos/asf/incubator-streams into STREAMS-134

Posted by sb...@apache.org.
Merge branch 'STREAMS-134' of https://git-wip-us.apache.org/repos/asf/incubator-streams into STREAMS-134

Conflicts:
	streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
	streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2f1bc422
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2f1bc422
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2f1bc422

Branch: refs/heads/STREAMS-134
Commit: 2f1bc422612d3d6030a48bd956ba9b0c1994855c
Parents: 7df265f 4e9a2bb
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Tue Jul 15 21:00:11 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Tue Jul 15 21:00:11 2014 -0500

----------------------------------------------------------------------

----------------------------------------------------------------------



[16/18] git commit: percolate implementation, untested

Posted by sb...@apache.org.
percolate implementation, untested


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b3d849d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b3d849d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b3d849d6

Branch: refs/heads/STREAMS-134
Commit: b3d849d65b4137c84f9ed12d86322d14758f9b3f
Parents: b7e4c34
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Fri Jul 11 00:49:51 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Tue Jul 15 20:57:39 2014 -0500

----------------------------------------------------------------------
 .../elasticsearch/PercolateProcessor.java       | 205 +++++++++++++++----
 .../ElasticsearchWriterConfiguration.json       |  20 +-
 .../apache/streams/data/util/ActivityUtil.java  |   5 +
 3 files changed, 189 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b3d849d6/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
index 9bc8d42..c5d9f4c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
@@ -18,25 +18,33 @@
 
 package org.apache.streams.elasticsearch;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.percolate.PercolateResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.QueryStringQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * References:
@@ -48,20 +56,23 @@ import java.util.concurrent.LinkedBlockingQueue;
  * [t.co behavior]      https://dev.twitter.com/docs/tco-redirection-behavior
  */
 
-public class PercolateProcessor implements StreamsProcessor, Runnable {
+public class PercolateProcessor implements StreamsProcessor {
+
     private final static Logger LOGGER = LoggerFactory.getLogger(PercolateProcessor.class);
 
-    private ObjectMapper mapper = new ObjectMapper();
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
     protected Queue<StreamsDatum> inQueue;
     protected Queue<StreamsDatum> outQueue;
 
+    public String TAGS_EXTENSION = "tags";
+
     private ElasticsearchWriterConfiguration config;
     private ElasticsearchClientManager manager;
+    private BulkRequestBuilder bulkBuilder;
 
-    public PercolateProcessor(Queue<StreamsDatum> inQueue) {
-        this.inQueue = inQueue;
-        this.outQueue = new LinkedBlockingQueue<StreamsDatum>();
+    public PercolateProcessor(ElasticsearchConfiguration config) {
+        manager = new ElasticsearchClientManager(config);
     }
 
     public ElasticsearchClientManager getManager() {
@@ -72,7 +83,7 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
         this.manager = manager;
     }
 
-    public ElasticsearchWriterConfiguration getConfig() {
+    public ElasticsearchConfiguration getConfig() {
         return config;
     }
 
@@ -80,16 +91,6 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
         this.config = config;
     }
 
-    public void start() {
-        Preconditions.checkNotNull(config);
-        Preconditions.checkNotNull(manager);
-        Preconditions.checkNotNull(manager.getClient());
-    }
-
-    public void stop() {
-
-    }
-
     public Queue<StreamsDatum> getProcessorOutputQueue() {
         return outQueue;
     }
@@ -115,19 +116,21 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
             json = node.asText();
         }
 
-        PercolateResponse response = manager.getClient().preparePercolate().setDocumentType(config.getType()).setSource(json).execute().actionGet();
+        PercolateResponse response = manager.getClient().preparePercolate().setSource(json).execute().actionGet();
 
         ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
 
         for (PercolateResponse.Match match : response.getMatches()) {
             tagArray.add(match.getId().string());
-
         }
 
-        // need utility methods for get / create specific node
-        ObjectNode extensions = (ObjectNode) node.get("extensions");
-        ObjectNode w2o = (ObjectNode) extensions.get("w2o");
-        w2o.put("tags", tagArray);
+        Activity activity = mapper.convertValue(node, Activity.class);
+
+        Map<String, Object> extensions = ActivityUtil.ensureExtensions(activity);
+
+        extensions.put(TAGS_EXTENSION, tagArray);
+
+        activity.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
 
         result.add(entry);
 
@@ -137,33 +140,155 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
 
     @Override
     public void prepare(Object o) {
-        start();
+
+        Preconditions.checkNotNull(manager);
+        Preconditions.checkNotNull(manager.getClient());
+        Preconditions.checkNotNull(config);
+        Preconditions.checkNotNull(config.getTags());
+        Preconditions.checkArgument(config.getTags().size() > 0);
+
+        // consider using mapping to figure out what fields are included in _all
+        //manager.getClient().admin().indices().prepareGetMappings(config.getIndex()).get().getMappings().get(config.getType()).;
+
+        deleteOldQueries(config.getIndex());
+        for (Tag tag : config.getTags()) {
+            PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag.getId());
+            queryBuilder.addQuery(tag.getQuery(), FilterLevel.MUST, "_all");
+            addPercolateRule(queryBuilder, config.getIndex());
+        }
+        if (writePercolateRules() == true)
+            LOGGER.info("wrote " + bulkBuilder.numberOfActions() + " tags to _percolator");
+        else
+            LOGGER.error("FAILED writing " + bulkBuilder.numberOfActions() + " tags to _percolator");
+
+
     }
 
     @Override
     public void cleanUp() {
-        stop();
+        manager.getClient().close();
     }
 
-    @Override
-    public void run() {
+    public int numOfPercolateRules() {
+        return this.bulkBuilder.numberOfActions();
+    }
 
-        while (true) {
-            StreamsDatum item;
-            try {
-                item = inQueue.poll();
+    public void addPercolateRule(PercolateQueryBuilder builder, String index) {
+        this.bulkBuilder.add(manager.getClient().prepareIndex("_percolator", index, builder.getId()).setSource(builder.getSource()));
+    }
 
-                Thread.sleep(new Random().nextInt(100));
+    /**
+     *
+     * @return returns true if all rules were addded. False indicates one or more rules have failed.
+     */
+    public boolean writePercolateRules() {
+        if(this.numOfPercolateRules() < 0) {
+            throw new RuntimeException("No Rules Have been added!");
+        }
+        BulkResponse response = this.bulkBuilder.execute().actionGet();
+        for(BulkItemResponse r : response.getItems()) {
+            if(r.isFailed()) {
+                System.out.println(r.getId()+"\t"+r.getFailureMessage());
+            }
+        }
+        return !response.hasFailures();
+    }
 
-                for (StreamsDatum entry : process(item)) {
-                    outQueue.offer(entry);
-                }
+    /**
+     *
+     * @param ids
+     * @param index
+     * @return  Returns true if all of the old tags were removed. False indicates one or more tags were not removed.
+     */
+    public boolean removeOldTags(Set<String> ids, String index) {
+        if(ids.size() == 0) {
+            return false;
+        }
+        BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+        for(String id : ids) {
+            bulk.add(manager.getClient().prepareDelete("_percolator", index, id));
+        }
+        return !bulk.execute().actionGet().hasFailures();
+    }
 
+    public Set<String> getActivePercolateTags(String index) {
+        Set<String> tags = new HashSet<String>();
+        SearchRequestBuilder searchBuilder = manager.getClient().prepareSearch("_percolator").setTypes(index).setSize(1000);
+        SearchResponse response = searchBuilder.setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
+        SearchHits hits = response.getHits();
+        for(SearchHit hit : hits.getHits()) {
+            tags.add(hit.id());
+        }
+        return tags;
+    }
 
-            } catch (Exception e) {
-                e.printStackTrace();
+    /**
+     *
+     * @param index
+     * @return
+     */
+    public boolean deleteOldQueries(String index) {
+        Set<String> tags = getActivePercolateTags(index);
+        if(tags.size() == 0) {
+            LOGGER.warn("No active tags were found in _percolator for index : {}", index);
+            return false;
+        }
+        LOGGER.info("Deleting {} tags.", tags.size());
+        BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+        for(String tag : tags) {
+            bulk.add(manager.getClient().prepareDelete("_percolator", index, tag));
+        }
+        BulkResponse response =bulk.execute().actionGet();
+        return !response.hasFailures();
+    }
+
+    public static class PercolateQueryBuilder {
+
+        private BoolQueryBuilder queryBuilder;
+        private String id;
+
+        public PercolateQueryBuilder(String id) {
+            this.id = id;
+            this.queryBuilder = QueryBuilders.boolQuery();
+        }
 
+        public void setMinumumNumberShouldMatch(int shouldMatch) {
+            this.queryBuilder.minimumNumberShouldMatch(shouldMatch);
+        }
+
+
+        public void addQuery(String query, FilterLevel level, String... fields) {
+            QueryStringQueryBuilder builder = QueryBuilders.queryString(query);
+            if(fields != null && fields.length > 0) {
+                for(String field : fields) {
+                    builder.field(field);
+                }
             }
+            switch (level) {
+                case MUST:
+                    this.queryBuilder.must(builder);
+                    break;
+                case SHOULD:
+                    this.queryBuilder.should(builder);
+                    break;
+                case MUST_NOT:
+                    this.queryBuilder.mustNot(builder);
+            }
+        }
+
+        public String getId() {
+            return this.id;
         }
+
+        public String getSource() {
+            return "{ \n\"query\" : "+this.queryBuilder.toString()+"\n}";
+        }
+
+
     }
+
+    public enum FilterLevel {
+        MUST, SHOULD, MUST_NOT
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b3d849d6/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index a38ff85..bf7b146 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -31,6 +31,24 @@
 		},
 		"maxTimeBetweenFlushMs": {
 			"type": "integer"
-		}
+		},
+        "tags": {
+            "type": "array",
+            "description": "Tags to apply",
+            "items": {
+                "type": "object",
+                "description": "Tag to apply",
+                "properties": {
+                    "id": {
+                        "type": "string",
+                        "description": "Tag identifier"
+                    },
+                    "query": {
+                        "type": "string",
+                        "description": "Tag query"
+                    }
+                }
+            }
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b3d849d6/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
index f1fcf44..3684b32 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
@@ -18,6 +18,8 @@
 
 package org.apache.streams.data.util;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 
 import java.util.HashMap;
@@ -27,6 +29,7 @@ import java.util.Map;
  * Utility class for managing activities
  */
 public class ActivityUtil {
+
     private ActivityUtil() {}
 
     /**
@@ -58,6 +61,8 @@ public class ActivityUtil {
      */
     public static final String LOCATION_EXTENSION_COORDINATES = "coordinates";
 
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
     /**
      * Creates a standard extension property
      * @param activity activity to create the property in


[08/18] git commit: STREAMS-105 | Code review feedback

Posted by sb...@apache.org.
STREAMS-105 | Code review feedback


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/abaf5b00
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/abaf5b00
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/abaf5b00

Branch: refs/heads/STREAMS-134
Commit: abaf5b00b3cfb196ff736029a8dc44d3c90b7a86
Parents: 26f3614
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Mon Jul 7 13:23:11 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Mon Jul 7 13:23:11 2014 -0500

----------------------------------------------------------------------
 .../datasift/serializer/DatasiftTweetActivitySerializer.java       | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/abaf5b00/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
index 0482235..158d027 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
@@ -165,7 +165,7 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
         double[] coordiantes = new double[] { twitter.getGeo().getLongitude(), twitter.getGeo().getLatitude() };
         Map<String, Object> coords = Maps.newHashMap();
         coords.put("coordinates", coordiantes);
-        coords.put("type", "point");
+        coords.put("type", "Point");
         location.put("coordinates", coords);
         extensions.put("location", location);
     }


[09/18] git commit: Merge pull request #7 from apache/master

Posted by sb...@apache.org.
Merge pull request #7 from apache/master

Merge Apache

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/af1f2943
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/af1f2943
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/af1f2943

Branch: refs/heads/STREAMS-134
Commit: af1f2943c765536ac0424b008d9a5f92984ca984
Parents: 3b98df8 c50ce91
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Tue Jul 8 13:35:46 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Tue Jul 8 13:35:46 2014 -0500

----------------------------------------------------------------------
 .../streams-persist-elasticsearch/README.md     |  38 +++
 .../ElasticsearchConfigurator.java              |  14 +
 .../elasticsearch/ElasticsearchQuery.java       |  34 ++-
 .../ElasticsearchReaderConfiguration.json       |   5 +
 .../src/main/resources/reference.json           |   9 +
 streams-contrib/streams-persist-mongo/README.md |  16 ++
 .../streams/mongo/MongoPersistReader.java       | 270 +++++++++++++++++++
 .../src/main/resources/reference.json           |   8 +
 .../src/main/resources/reference.properties     |  10 -
 .../twitter/processor/TwitterTypeConverter.java |   2 +-
 .../TwitterJsonActivitySerializer.java          |   3 +
 .../TwitterJsonUserActivitySerializer.java      |  72 +++++
 .../serializer/util/TwitterActivityUtil.java    |  22 ++
 .../streams/local/builders/StreamComponent.java |   2 +-
 .../local/tasks/StreamsProviderTask.java        |   3 +-
 15 files changed, 490 insertions(+), 18 deletions(-)
----------------------------------------------------------------------



[12/18] git commit: Merge pull request #8 from apache/master

Posted by sb...@apache.org.
Merge pull request #8 from apache/master

Merge Apache

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5c9a531d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5c9a531d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5c9a531d

Branch: refs/heads/STREAMS-134
Commit: 5c9a531d9227f44133fe67aea3f90ebbaf3de020
Parents: af1f294 38ed41a
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Thu Jul 10 10:15:52 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Thu Jul 10 10:15:52 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchPersistReader.java             |  3 +-
 .../DatasiftTypeConverterProcessor.java         | 12 +-----
 .../DatasiftTweetActivitySerializer.java        | 42 +++++++++++++++++++-
 .../provider/SysomosHeartbeatStream.java        | 10 +++++
 .../sysomos/provider/SysomosProvider.java       |  4 +-
 5 files changed, 57 insertions(+), 14 deletions(-)
----------------------------------------------------------------------