You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/07/16 04:01:53 UTC
[01/18] git commit: Merge pull request #4 from apache/master
Repository: incubator-streams
Updated Branches:
refs/heads/STREAMS-134 4e9a2bb57 -> 2f1bc4226
Merge pull request #4 from apache/master
Merge Apache
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2ee31575
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2ee31575
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2ee31575
Branch: refs/heads/STREAMS-134
Commit: 2ee31575eba481bf77c98a9edfe3d04eae333f16
Parents: 829f805 a33c215
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Mon Jun 23 14:03:00 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Mon Jun 23 14:03:00 2014 -0500
----------------------------------------------------------------------
.../org/apache/streams/s3/S3PersistReader.java | 9 +-
.../streams/console/ConsolePersistReader.java | 23 +
.../streams/console/ConsolePersistWriter.java | 18 +
.../elasticsearch/ElasticsearchClient.java | 20 +-
.../ElasticsearchClientManager.java | 18 +
.../ElasticsearchConfigurator.java | 18 +
.../ElasticsearchPersistReader.java | 26 +-
.../ElasticsearchPersistUpdater.java | 18 +
.../ElasticsearchPersistWriter.java | 1 +
.../ElasticsearchPersistWriterTask.java | 18 +
.../elasticsearch/ElasticsearchQuery.java | 20 +-
.../elasticsearch/PercolateProcessor.java | 20 +-
.../apache/streams/hbase/HbaseConfigurator.java | 18 +
.../streams/hbase/HbasePersistWriter.java | 18 +
.../streams/hbase/HbasePersistWriterTask.java | 18 +
.../apache/streams/hdfs/HdfsConfigurator.java | 18 +
.../streams/hdfs/WebHdfsPersistReader.java | 27 +-
.../streams/hdfs/WebHdfsPersistReaderTask.java | 18 +
.../streams/hdfs/WebHdfsPersistWriter.java | 18 +
.../streams/hdfs/WebHdfsPersistWriterTask.java | 18 +
.../apache/streams/kafka/KafkaConfigurator.java | 18 +
.../streams/kafka/KafkaPersistReader.java | 23 +
.../streams/kafka/KafkaPersistReaderTask.java | 18 +
.../streams/kafka/KafkaPersistWriter.java | 18 +
.../streams/kafka/KafkaPersistWriterTask.java | 18 +
.../streams/kafka/StreamsPartitioner.java | 18 +
.../apache/streams/mongo/MongoConfigurator.java | 18 +
.../streams/mongo/MongoPersistWriter.java | 18 +
.../apache/streams/json/JsonPathExtractor.java | 5 +-
.../org/apache/streams/urls/LinkResolver.java | 3 +-
.../urls/LinkResolverHelperFunctions.java | 1 +
.../streams/urls/LinkResolverProcessor.java | 3 +-
.../streams/urls/LinkHelperFunctionsTest.java | 18 +
.../streams/urls/TestLinkUnwinderProcessor.java | 18 +
.../streams-provider-datasift/pom.xml | 12 +-
.../streams/datasift/csdl/DatasiftCsdlUtil.java | 114 +
.../datasift/provider/DatasiftConverter.java | 1 +
.../provider/DatasiftStreamConfigurator.java | 2 +-
.../provider/DatasiftStreamProvider.java | 7 +-
.../DatasiftTypeConverterProcessor.java | 75 +-
.../serializer/DatasiftActivitySerializer.java | 193 +-
.../DatasiftDefaultActivitySerializer.java | 211 +
.../DatasiftTweetActivitySerializer.java | 234 +
.../main/jsonschema/com/datasift/Datasift.json | 44 +-
.../com/datasift/test/DatasiftSerDeTest.java | 37 +-
.../provider/DatasiftStreamProviderTest.java | 18 +
.../DatasiftTypeConverterProcessorTest.java | 20 +-
.../datasift/provider/ErrorHandlerTest.java | 18 +
.../datasift/provider/SubscriptionTest.java | 18 +
.../DatasiftActivitySerializerTest.java | 74 +
.../src/test/resources/amazon_datasift_json.txt | 10 +
.../src/test/resources/blog_datasift_json.txt | 719 +++
.../src/test/resources/board_datasift_json.txt | 4160 ++++++++++++++++++
.../test/resources/facebook_datasift_json.txt | 1843 ++++++++
.../src/test/resources/part-r-00000.json | 1724 ++++----
.../resources/rand_sample_datasift_json.txt | 1547 +++++++
.../resources/random_sample_datasift_json.txt | 1547 +++++++
.../src/test/resources/reddit_datasift_json.txt | 33 +
.../test/resources/twitter_datasift_json.txt | 1000 +++++
.../test/resources/wikipedia_datasift_json.txt | 252 ++
.../test/resources/youtube_datasift_json.txt | 7 +
...FacebookPublicFeedXmlActivitySerializer.java | 18 +
.../FacebookPostActivitySerializerTest.java | 1 +
.../facebook/test/FacebookPostSerDeTest.java | 18 +
.../facebook/test/FacebookEDCSerDeTest.java | 18 +
.../gnip/flickr/test/FlickrEDCSerDeTest.java | 18 +
.../com/gplus/api/GPlusActivitySerializer.java | 18 +
.../com/gplus/api/GPlusEDCAsActivityTest.java | 18 +
.../com/instagram/test/InstagramSerDeTest.java | 18 +
.../reddit/api/RedditActivitySerializer.java | 18 +
.../reddit/api/RedditEDCAsActivityJSONTest.java | 18 +
.../java/com/gnip/test/YouTubeEDCSerDeTest.java | 20 +-
.../com/gnip/test/YoutubeEDCAsActivityTest.java | 2 +-
.../ActivityXMLActivitySerializer.java | 18 +
.../PowerTrackActivitySerializer.java | 18 +
.../test/PowerTrackDeserializationTest.java | 18 +
.../com/google/gmail/GMailConfigurator.java | 18 +
.../gmail/provider/GMailImapProviderTask.java | 18 +
.../GMailMessageActivitySerializer.java | 18 +
.../google/gmail/provider/GMailProvider.java | 26 +-
.../gmail/provider/GMailRssProviderTask.java | 18 +
.../gplus/provider/GPlusActivitySerializer.java | 18 +
.../gplus/provider/GPlusConfigurator.java | 18 +
.../gplus/provider/GPlusEventProcessor.java | 18 +
.../provider/GPlusHistoryProviderTask.java | 18 +
.../google/gplus/provider/GPlusProvider.java | 23 +
.../gmail/test/GMailMessageSerDeTest.java | 18 +
.../data/MoreoverJsonActivitySerializer.java | 18 +
.../data/MoreoverXmlActivitySerializer.java | 18 +
.../streams/data/moreover/MoreoverClient.java | 18 +
.../data/moreover/MoreoverConfigurator.java | 18 +
.../streams/data/moreover/MoreoverProvider.java | 23 +
.../data/moreover/MoreoverProviderTask.java | 18 +
.../streams/data/moreover/MoreoverResult.java | 18 +
.../data/moreover/MoreoverResultSetWrapper.java | 18 +
.../apache/streams/data/util/MoreoverUtils.java | 18 +
.../MoreoverJsonActivitySerializerTest.java | 1 +
.../data/MoreoverXmlActivitySerializerTest.java | 18 +
.../rss/provider/RssEventClassifier.java | 18 +
.../streams/rss/provider/RssEventProcessor.java | 18 +
.../rss/provider/RssStreamConfigurator.java | 18 +
.../streams/rss/provider/RssStreamProvider.java | 24 +
.../rss/provider/RssStreamProviderTask.java | 18 +
.../serializer/SyndEntryActivitySerializer.java | 18 +
.../streams/rss/test/Top100FeedsTest.java | 18 +
.../sysomos/config/SysomosConfigurator.java | 39 +
.../sysomos/proessor/SysomosTypeConverter.java | 56 -
.../sysomos/processor/SysomosTypeConverter.java | 56 +
.../sysomos/provider/ContentRequestBuilder.java | 1 +
.../provider/SysomosHeartbeatStream.java | 130 +-
.../sysomos/provider/SysomosProvider.java | 50 +-
.../streams/sysomos/util/SysomosUtils.java | 2 +-
.../com/sysomos/test/SysomosJsonSerDeTest.java | 18 +
.../com/sysomos/test/SysomosXmlSerDeTest.java | 18 +
.../processor/TwitterEventProcessor.java | 18 +
.../processor/TwitterProfileProcessor.java | 31 +-
.../twitter/processor/TwitterTypeConverter.java | 18 +
.../twitter/provider/TwitterErrorHandler.java | 20 +-
.../provider/TwitterEventClassifier.java | 18 +
.../provider/TwitterStreamConfigurator.java | 18 +
.../twitter/provider/TwitterStreamProvider.java | 23 +
.../provider/TwitterStreamProviderTask.java | 18 +
.../provider/TwitterTimelineProvider.java | 32 +-
.../provider/TwitterTimelineProviderTask.java | 18 +
.../TwitterUserInformationProvider.java | 12 +-
.../serializer/StreamsTwitterMapper.java | 18 +
.../TwitterJsonActivitySerializer.java | 18 +
.../TwitterJsonDeleteActivitySerializer.java | 18 +
.../TwitterJsonRetweetActivitySerializer.java | 18 +
.../TwitterJsonTweetActivitySerializer.java | 18 +
...erJsonUserstreameventActivitySerializer.java | 18 +
.../streams/twitter/test/SimpleTweetTest.java | 18 +
.../twitter/test/TweetActivitySerDeTest.java | 18 +
.../streams/twitter/test/TweetSerDeTest.java | 18 +
.../test/TwitterEventClassifierTest.java | 18 +
.../twitter/test/TwitterStreamProviderTest.java | 18 +
.../org/apache/streams/core/DatumStatus.java | 18 +
.../streams/core/DatumStatusCountable.java | 18 +
.../apache/streams/core/DatumStatusCounter.java | 18 +
.../org/apache/streams/core/StreamBuilder.java | 18 +
.../org/apache/streams/core/StreamHandler.java | 18 +
.../org/apache/streams/core/StreamState.java | 18 +
.../apache/streams/core/StreamsOperation.java | 18 +
.../apache/streams/core/StreamsProvider.java | 38 +-
.../apache/streams/data/util/RFC3339Utils.java | 5 +
.../ActivityDeserializerException.java | 18 +
.../exceptions/ActivitySerializerException.java | 18 +
.../jackson/StreamsDateTimeDeserializer.java | 20 +-
.../jackson/StreamsDateTimeSerializer.java | 18 +
.../streams/jackson/StreamsJacksonMapper.java | 22 +-
.../streams/jackson/StreamsJacksonModule.java | 18 +
.../jackson/StreamsPeriodDeserializer.java | 18 +
.../jackson/StreamsPeriodSerializer.java | 20 +-
.../data/data/util/DateTimeSerDeTest.java | 18 +
.../local/builders/InvalidStreamException.java | 18 +
.../local/builders/LocalStreamBuilder.java | 18 +
.../streams/local/builders/StreamComponent.java | 18 +
.../streams/local/tasks/BaseStreamsTask.java | 18 +
.../tasks/LocalStreamProcessMonitorThread.java | 18 +
.../tasks/StatusCounterMonitorRunnable.java | 18 +
.../local/tasks/StatusCounterMonitorThread.java | 18 +
.../streams/local/tasks/StreamsMergeTask.java | 18 +
.../local/tasks/StreamsPersistWriterTask.java | 18 +
.../local/tasks/StreamsProcessorTask.java | 18 +
.../local/tasks/StreamsProviderTask.java | 18 +
.../apache/streams/local/tasks/StreamsTask.java | 18 +
.../local/builders/LocalStreamBuilderTest.java | 18 +
.../local/builders/ToyLocalBuilderExample.java | 18 +
.../streams/local/tasks/BasicTasksTest.java | 18 +
.../test/processors/DoNothingProcessor.java | 18 +
.../PassthroughDatumCounterProcessor.java | 18 +
.../test/providers/EmptyResultSetProvider.java | 5 +
.../test/providers/NumericMessageProvider.java | 23 +
.../local/test/writer/DatumCounterWriter.java | 18 +
.../local/test/writer/DoNothingWriter.java | 18 +
.../local/test/writer/SystemOutWriter.java | 18 +
.../component/ExpectedDatumsPersistWriter.java | 18 +
.../test/component/FileReaderProvider.java | 23 +
.../test/component/StreamsDatumConverter.java | 18 +
.../component/StringToDocumentConverter.java | 18 +
.../tests/TestComponentsLocalStream.java | 18 +
.../tests/TestExpectedDatumsPersitWriter.java | 18 +
.../component/tests/TestFileReaderProvider.java | 18 +
.../trident/StreamsPersistWriterState.java | 18 +
.../storm/trident/StreamsProcessorFunction.java | 18 +
.../storm/trident/StreamsProviderSpout.java | 20 +-
.../org/apache/streams/util/ComponentUtils.java | 18 +
.../java/org/apache/streams/util/DateUtil.java | 18 +
.../apache/streams/util/SerializationUtil.java | 18 +
189 files changed, 15837 insertions(+), 1172 deletions(-)
----------------------------------------------------------------------
[11/18] git commit: Removed commented out duplicate code
Posted by sb...@apache.org.
Removed commented out duplicate code
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/62aab459
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/62aab459
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/62aab459
Branch: refs/heads/STREAMS-134
Commit: 62aab4593027904adce63c1c7cc17d468d141e18
Parents: b7bf172
Author: rebanks <re...@w2odigital.com>
Authored: Tue Jul 8 14:34:46 2014 -0500
Committer: rebanks <re...@w2odigital.com>
Committed: Tue Jul 8 14:34:46 2014 -0500
----------------------------------------------------------------------
.../DatasiftTweetActivitySerializer.java | 45 --------------------
1 file changed, 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/62aab459/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
index 269f12a..7b9ccb3 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
@@ -129,51 +129,6 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
return actor;
}
- //Need to make retweet user and tweet user the same object.
-// public Actor retweetBuildActor(Actor actor, org.apache.streams.datasift.twitter.User user) {
-//
-// actor.setDisplayName(user.getName());
-// actor.setId(formatId(Optional.fromNullable(
-// user.getIdStr())
-// .or(Optional.of(user.getId().toString()))
-// .orNull()));
-// actor.setSummary(user.getDescription());
-// try {
-// actor.setPublished(RFC3339Utils.parseToUTC(user.getCreatedAt()));
-// } catch (Exception e) {
-// LOGGER.warn("Exception trying to parse date : {}", e);
-// }
-//
-// if(user.getUrl() != null) {
-// actor.setUrl(user.getUrl());
-// }
-//
-// Map<String, Object> extensions = new HashMap<String,Object>();
-// extensions.put("location", user.getLocation());
-// extensions.put("posts", user.getStatusesCount());
-// extensions.put("followers", user.getFollowersCount());
-// extensions.put("screenName", user.getScreenName());
-// if(user.getAdditionalProperties() != null) {
-// extensions.put("favorites", user.getAdditionalProperties().get("favourites_count"));
-// }
-//
-// Image profileImage = new Image();
-// String profileUrl = null;
-// if(actor.getImage() == null && user.getAdditionalProperties() != null) {
-// Object url = user.getAdditionalProperties().get("profile_image_url_https");
-// if(url instanceof String)
-// profileUrl = (String) url;
-// }
-// if(actor.getImage() == null && profileUrl == null) {
-// profileUrl = user.getProfileImageUrl();
-// }
-// profileImage.setUrl(profileUrl);
-// actor.setImage(profileImage);
-//
-// actor.setAdditionalProperty("extensions", extensions);
-// return actor;
-// }
-
public void addLocationExtension(Activity activity, Twitter twitter) {
Map<String, Object> extensions = ensureExtensions(activity);
Map<String, Object> location = Maps.newHashMap();
[07/18] git commit: STREAMS-105 | Code review feedback
Posted by sb...@apache.org.
STREAMS-105 | Code review feedback
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/26f3614f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/26f3614f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/26f3614f
Branch: refs/heads/STREAMS-134
Commit: 26f3614fd7d72be4526fc9589e75dfb32dda6588
Parents: 949b676
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Wed Jul 2 10:11:16 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Wed Jul 2 10:11:16 2014 -0500
----------------------------------------------------------------------
.../datasift/serializer/DatasiftDefaultActivitySerializer.java | 2 +-
.../datasift/serializer/DatasiftTweetActivitySerializer.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/26f3614f/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
index 09126dc..4095df6 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
@@ -174,7 +174,7 @@ public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Dat
ActivityObject actObj = new ActivityObject();
actObj.setObjectType(interaction.getContenttype());
actObj.setUrl(interaction.getLink());
- actObj.setId(interaction.getId());
+ actObj.setId(formatId("post",interaction.getId()));
actObj.setContent(interaction.getContent());
return actObj;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/26f3614f/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
index 1fecc4b..0482235 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
@@ -165,7 +165,7 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
double[] coordiantes = new double[] { twitter.getGeo().getLongitude(), twitter.getGeo().getLatitude() };
Map<String, Object> coords = Maps.newHashMap();
coords.put("coordinates", coordiantes);
- coords.put("type", "geo_point");
+ coords.put("type", "point");
location.put("coordinates", coords);
extensions.put("location", location);
}
[02/18] git commit: Merge pull request #5 from apache/master
Posted by sb...@apache.org.
Merge pull request #5 from apache/master
Merge From Apache
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d71b159d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d71b159d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d71b159d
Branch: refs/heads/STREAMS-134
Commit: d71b159d1996a939cccede70b509c1767478aa72
Parents: 2ee3157 34c95a6
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Fri Jun 27 16:43:57 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Fri Jun 27 16:43:57 2014 -0500
----------------------------------------------------------------------
pom.xml | 2 +-
streams-contrib/streams-provider-google/google-gmail/pom.xml | 2 +-
.../apache/streams/sysomos/provider/SysomosHeartbeatStream.java | 4 +++-
streams-contrib/streams-provider-twitter/pom.xml | 5 +++--
4 files changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
[13/18] git commit: Merge branch 'master' into STREAMS-105
Posted by sb...@apache.org.
Merge branch 'master' into STREAMS-105
Conflicts:
streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/fbb8ccff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/fbb8ccff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/fbb8ccff
Branch: refs/heads/STREAMS-134
Commit: fbb8ccffc0db8de0f2d591d1e6a3251b6ff96053
Parents: abaf5b0 5c9a531
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Thu Jul 10 10:24:27 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Thu Jul 10 10:24:27 2014 -0500
----------------------------------------------------------------------
.../streams-persist-elasticsearch/README.md | 38 +
.../ElasticsearchConfigurator.java | 18 +
.../ElasticsearchPersistReader.java | 3 +-
.../ElasticsearchPersistUpdater.java | 35 -
.../ElasticsearchPersistWriter.java | 705 +++++++------------
.../ElasticsearchPersistWriterTask.java | 56 --
.../elasticsearch/ElasticsearchQuery.java | 34 +-
.../ElasticsearchReaderConfiguration.json | 5 +
.../ElasticsearchWriterConfiguration.json | 5 +
.../src/main/resources/reference.json | 9 +
streams-contrib/streams-persist-mongo/README.md | 16 +
.../streams/mongo/MongoPersistReader.java | 270 +++++++
.../src/main/resources/reference.json | 8 +
.../src/main/resources/reference.properties | 10 -
.../DatasiftTypeConverterProcessor.java | 12 +-
.../DatasiftTweetActivitySerializer.java | 180 ++---
.../provider/SysomosHeartbeatStream.java | 12 +-
.../sysomos/provider/SysomosProvider.java | 4 +-
.../twitter/processor/TwitterTypeConverter.java | 2 +-
.../TwitterJsonActivitySerializer.java | 3 +
.../TwitterJsonUserActivitySerializer.java | 72 ++
.../serializer/util/TwitterActivityUtil.java | 22 +
.../streams/local/builders/StreamComponent.java | 2 +-
.../local/tasks/StreamsProviderTask.java | 54 +-
.../local/builders/LocalStreamBuilderTest.java | 52 +-
.../local/test/processors/SlowProcessor.java | 50 ++
.../test/providers/EmptyResultSetProvider.java | 2 +-
.../test/providers/NumericMessageProvider.java | 63 +-
.../test/component/FileReaderProvider.java | 52 +-
29 files changed, 1024 insertions(+), 770 deletions(-)
----------------------------------------------------------------------
[10/18] git commit: Created DatasiftTwitterUser object
Posted by sb...@apache.org.
Created DatasiftTwitterUser object
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b7bf172a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b7bf172a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b7bf172a
Branch: refs/heads/STREAMS-134
Commit: b7bf172a623fa484e8068abe46ca0d39ed693593
Parents: c50ce91
Author: rebanks <re...@w2odigital.com>
Authored: Tue Jul 8 14:33:25 2014 -0500
Committer: rebanks <re...@w2odigital.com>
Committed: Tue Jul 8 14:33:25 2014 -0500
----------------------------------------------------------------------
.../DatasiftTweetActivitySerializer.java | 92 +++++-----
.../main/jsonschema/com/datasift/Datasift.json | 179 +------------------
.../com/datasift/DatasiftTwitterUser.json | 61 +++++++
3 files changed, 113 insertions(+), 219 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7bf172a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
index 3406eb4..269f12a 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
@@ -25,7 +25,7 @@ import com.google.common.collect.Maps;
import org.apache.streams.data.util.RFC3339Utils;
import org.apache.streams.datasift.Datasift;
import org.apache.streams.datasift.interaction.Interaction;
-import org.apache.streams.datasift.interaction.User;
+import org.apache.streams.datasift.twitter.DatasiftTwitterUser;
import org.apache.streams.datasift.twitter.Retweet;
import org.apache.streams.datasift.twitter.Twitter;
import org.apache.streams.pojo.json.Activity;
@@ -80,10 +80,10 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
}
public Actor buildActor(Datasift event, Twitter twitter) {
- User user = twitter.getUser();
+ DatasiftTwitterUser user = twitter.getUser();
Actor actor = super.buildActor(event.getInteraction());
if(user == null) {
- return retweetBuildActor(actor, twitter.getRetweet().getUser());
+ user = twitter.getRetweet().getUser();
}
actor.setDisplayName(user.getName());
@@ -130,49 +130,49 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
}
//Need to make retweet user and tweet user the same object.
- public Actor retweetBuildActor(Actor actor, org.apache.streams.datasift.twitter.User user) {
-
- actor.setDisplayName(user.getName());
- actor.setId(formatId(Optional.fromNullable(
- user.getIdStr())
- .or(Optional.of(user.getId().toString()))
- .orNull()));
- actor.setSummary(user.getDescription());
- try {
- actor.setPublished(RFC3339Utils.parseToUTC(user.getCreatedAt()));
- } catch (Exception e) {
- LOGGER.warn("Exception trying to parse date : {}", e);
- }
-
- if(user.getUrl() != null) {
- actor.setUrl(user.getUrl());
- }
-
- Map<String, Object> extensions = new HashMap<String,Object>();
- extensions.put("location", user.getLocation());
- extensions.put("posts", user.getStatusesCount());
- extensions.put("followers", user.getFollowersCount());
- extensions.put("screenName", user.getScreenName());
- if(user.getAdditionalProperties() != null) {
- extensions.put("favorites", user.getAdditionalProperties().get("favourites_count"));
- }
-
- Image profileImage = new Image();
- String profileUrl = null;
- if(actor.getImage() == null && user.getAdditionalProperties() != null) {
- Object url = user.getAdditionalProperties().get("profile_image_url_https");
- if(url instanceof String)
- profileUrl = (String) url;
- }
- if(actor.getImage() == null && profileUrl == null) {
- profileUrl = user.getProfileImageUrl();
- }
- profileImage.setUrl(profileUrl);
- actor.setImage(profileImage);
-
- actor.setAdditionalProperty("extensions", extensions);
- return actor;
- }
+// public Actor retweetBuildActor(Actor actor, org.apache.streams.datasift.twitter.User user) {
+//
+// actor.setDisplayName(user.getName());
+// actor.setId(formatId(Optional.fromNullable(
+// user.getIdStr())
+// .or(Optional.of(user.getId().toString()))
+// .orNull()));
+// actor.setSummary(user.getDescription());
+// try {
+// actor.setPublished(RFC3339Utils.parseToUTC(user.getCreatedAt()));
+// } catch (Exception e) {
+// LOGGER.warn("Exception trying to parse date : {}", e);
+// }
+//
+// if(user.getUrl() != null) {
+// actor.setUrl(user.getUrl());
+// }
+//
+// Map<String, Object> extensions = new HashMap<String,Object>();
+// extensions.put("location", user.getLocation());
+// extensions.put("posts", user.getStatusesCount());
+// extensions.put("followers", user.getFollowersCount());
+// extensions.put("screenName", user.getScreenName());
+// if(user.getAdditionalProperties() != null) {
+// extensions.put("favorites", user.getAdditionalProperties().get("favourites_count"));
+// }
+//
+// Image profileImage = new Image();
+// String profileUrl = null;
+// if(actor.getImage() == null && user.getAdditionalProperties() != null) {
+// Object url = user.getAdditionalProperties().get("profile_image_url_https");
+// if(url instanceof String)
+// profileUrl = (String) url;
+// }
+// if(actor.getImage() == null && profileUrl == null) {
+// profileUrl = user.getProfileImageUrl();
+// }
+// profileImage.setUrl(profileUrl);
+// actor.setImage(profileImage);
+//
+// actor.setAdditionalProperty("extensions", extensions);
+// return actor;
+// }
public void addLocationExtension(Activity activity, Twitter twitter) {
Map<String, Object> extensions = ensureExtensions(activity);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7bf172a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
index e7b664a..8d7ff3a 100644
--- a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
+++ b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
@@ -947,62 +947,8 @@
"type": "string"
},
"user": {
- "javaType": "org.apache.streams.datasift.twitter.User",
- "type": "object",
- "dynamic": "true",
- "properties": {
- "created_at": {
- "type": "string"
- },
- "description": {
- "type": "string"
- },
- "followers_count": {
- "type": "integer"
- },
- "friends_count": {
- "type": "integer"
- },
- "geo_enabled": {
- "type": "boolean"
- },
- "id": {
- "type": "integer"
- },
- "id_str": {
- "type": "string"
- },
- "lang": {
- "type": "string"
- },
- "listed_count": {
- "type": "integer"
- },
- "location": {
- "type": "string"
- },
- "name": {
- "type": "string"
- },
- "profile_image_url": {
- "type": "string"
- },
- "screen_name": {
- "type": "string"
- },
- "statuses_count": {
- "type": "integer"
- },
- "time_zone": {
- "type": "string"
- },
- "url": {
- "type": "string"
- },
- "utc_offset": {
- "type": "integer"
- }
- }
+ "type": "DatasiftTwitterUser",
+ "$ref": "./DatasiftTwitterUser.json"
}
}
},
@@ -1062,64 +1008,8 @@
"type": "string"
},
"user": {
- "javaType": "org.apache.streams.datasift.twitter.User",
- "dynamic": "true",
- "properties": {
- "created_at": {
- "type": "string"
- },
- "description": {
- "type": "string"
- },
- "followers_count": {
- "type": "integer"
- },
- "friends_count": {
- "type": "integer"
- },
- "geo_enabled": {
- "type": "boolean"
- },
- "id": {
- "type": "integer"
- },
- "id_str": {
- "type": "string"
- },
- "lang": {
- "type": "string"
- },
- "listed_count": {
- "type": "integer"
- },
- "location": {
- "type": "string"
- },
- "name": {
- "type": "string"
- },
- "profile_image_url": {
- "type": "string"
- },
- "screen_name": {
- "type": "string"
- },
- "statuses_count": {
- "type": "integer"
- },
- "time_zone": {
- "type": "string"
- },
- "url": {
- "type": "string"
- },
- "utc_offset": {
- "type": "integer"
- },
- "verified": {
- "type": "boolean"
- }
- }
+ "type": "DatasiftTwitterUser",
+ "$ref": "./DatasiftTwitterUser.json"
}
}
},
@@ -1130,65 +1020,8 @@
"type": "string"
},
"user": {
- "javaType": "org.apache.streams.datasift.interaction.User",
- "type": "object",
- "dynamic": "true",
- "properties": {
- "created_at": {
- "type": "string"
- },
- "description": {
- "type": "string"
- },
- "followers_count": {
- "type": "integer"
- },
- "friends_count": {
- "type": "integer"
- },
- "geo_enabled": {
- "type": "boolean"
- },
- "id": {
- "type": "integer"
- },
- "id_str": {
- "type": "string"
- },
- "lang": {
- "type": "string"
- },
- "listed_count": {
- "type": "integer"
- },
- "location": {
- "type": "string"
- },
- "name": {
- "type": "string"
- },
- "profile_image_url": {
- "type": "string"
- },
- "screen_name": {
- "type": "string"
- },
- "statuses_count": {
- "type": "integer"
- },
- "time_zone": {
- "type": "string"
- },
- "url": {
- "type": "string"
- },
- "utc_offset": {
- "type": "integer"
- },
- "verified": {
- "type": "boolean"
- }
- }
+ "type": "DatasiftTwitterUser",
+ "$ref": "./DatasiftTwitterUser.json"
}
}
},
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7bf172a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftTwitterUser.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftTwitterUser.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftTwitterUser.json
new file mode 100644
index 0000000..97d93fe
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/DatasiftTwitterUser.json
@@ -0,0 +1,61 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "javaType": "org.apache.streams.datasift.twitter.DatasiftTwitterUser",
+ "properties": {
+ "created_at": {
+ "type": "string"
+ },
+ "description": {
+ "type": "string"
+ },
+ "followers_count": {
+ "type": "integer"
+ },
+ "friends_count": {
+ "type": "integer"
+ },
+ "geo_enabled": {
+ "type": "boolean"
+ },
+ "id": {
+ "type": "integer"
+ },
+ "id_str": {
+ "type": "string"
+ },
+ "lang": {
+ "type": "string"
+ },
+ "listed_count": {
+ "type": "integer"
+ },
+ "location": {
+ "type": "string"
+ },
+ "name": {
+ "type": "string"
+ },
+ "profile_image_url": {
+ "type": "string"
+ },
+ "screen_name": {
+ "type": "string"
+ },
+ "statuses_count": {
+ "type": "integer"
+ },
+ "time_zone": {
+ "type": "string"
+ },
+ "url": {
+ "type": "string"
+ },
+ "utc_offset": {
+ "type": "integer"
+ },
+ "verified": {
+ "type": "boolean"
+ }
+ }
+}
\ No newline at end of file
[06/18] git commit: Merge pull request #6 from apache/master
Posted by sb...@apache.org.
Merge pull request #6 from apache/master
Merge Apache
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/3b98df84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/3b98df84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/3b98df84
Branch: refs/heads/STREAMS-134
Commit: 3b98df844ca87e79bf9cb88f16fbad361d9959ad
Parents: d71b159 ba96627
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Tue Jul 1 09:56:26 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Tue Jul 1 09:56:26 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchConfigurator.java | 4 +
.../ElasticsearchPersistUpdater.java | 35 -
.../ElasticsearchPersistWriter.java | 705 +++++++------------
.../ElasticsearchPersistWriterTask.java | 56 --
.../ElasticsearchWriterConfiguration.json | 5 +
.../provider/SysomosHeartbeatStream.java | 2 +-
.../local/tasks/StreamsProviderTask.java | 51 +-
.../local/builders/LocalStreamBuilderTest.java | 52 +-
.../local/test/processors/SlowProcessor.java | 50 ++
.../test/providers/EmptyResultSetProvider.java | 2 +-
.../test/providers/NumericMessageProvider.java | 63 +-
.../test/component/FileReaderProvider.java | 52 +-
12 files changed, 419 insertions(+), 658 deletions(-)
----------------------------------------------------------------------
[14/18] git commit: Merge PR50 for 'rbnks/STREAMS-127'
Posted by sb...@apache.org.
Merge PR50 for 'rbnks/STREAMS-127'
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/853ae3c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/853ae3c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/853ae3c7
Branch: refs/heads/STREAMS-134
Commit: 853ae3c7ae708fecea790556ac3bfb6bc7078117
Parents: 38ed41a 62aab45
Author: mfranklin <mf...@apache.org>
Authored: Mon Jul 14 12:35:28 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jul 14 12:35:28 2014 -0400
----------------------------------------------------------------------
.../DatasiftTweetActivitySerializer.java | 51 +-----
.../main/jsonschema/com/datasift/Datasift.json | 179 +------------------
.../com/datasift/DatasiftTwitterUser.json | 61 +++++++
3 files changed, 70 insertions(+), 221 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/853ae3c7/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --cc streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
index 05b579b,7b9ccb3..443aeec
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
@@@ -82,48 -79,11 +82,48 @@@ public class DatasiftTweetActivitySeria
return activity;
}
+ /**
+ * Get the links from this tweet as a list
+ * @param twitter
+ * @return the links from the tweet
+ */
+ public List<String> getLinks(Twitter twitter) {
+ return getLinks(twitter.getLinks());
+ }
+
+ /**
+ * Get the links from this tweet as a list
+ * @param retweet
+ * @return the links from the tweet
+ */
+ public List<String> getLinks(Retweet retweet) {
+ return getLinks(retweet.getLinks());
+ }
+
+ /**
+ * Converts the list of objects to a list of strings
+ * @param links
+ * @return
+ */
+ private List<String> getLinks(List<Object> links) {
+ if(links == null)
+ return Lists.newArrayList();
+ List<String> result = Lists.newLinkedList();
+ for(Object obj : links) {
+ if(obj instanceof String) {
+ result.add((String) obj);
+ } else {
+ LOGGER.warn("Links is not instance of String : {}", obj.getClass().getName());
+ }
+ }
+ return result;
+ }
+
public Actor buildActor(Datasift event, Twitter twitter) {
- User user = twitter.getUser();
+ DatasiftTwitterUser user = twitter.getUser();
Actor actor = super.buildActor(event.getInteraction());
if(user == null) {
- return retweetBuildActor(actor, twitter.getRetweet().getUser());
+ user = twitter.getRetweet().getUser();
}
actor.setDisplayName(user.getName());
[05/18] git commit: STREAMS-105 | Code review feedback
Posted by sb...@apache.org.
STREAMS-105 | Code review feedback
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/949b6768
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/949b6768
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/949b6768
Branch: refs/heads/STREAMS-134
Commit: 949b676897aa87bba92613b694c4808efdea8c56
Parents: ce6e327
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Mon Jun 30 17:13:35 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Mon Jun 30 17:13:35 2014 -0500
----------------------------------------------------------------------
.../DatasiftTweetActivitySerializer.java | 29 ++++++--------------
1 file changed, 8 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/949b6768/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
index a9514ed..1fecc4b 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
@@ -231,36 +231,23 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
ArrayList<Long> mentionIds = (ArrayList<Long>) interaction.getAdditionalProperties().get("mention_ids");
ArrayList<UserMentions> userMentions = new ArrayList<UserMentions>();
- if(mentions != null && mentionIds != null && (mentionIds.size() == mentions.size()) && !mentions.isEmpty() && !mentionIds.isEmpty()) {
+ if(mentions != null && !mentions.isEmpty()) {
for(int x = 0; x < mentions.size(); x ++) {
UserMentions mention = new UserMentions();
-
- mention.setIdStr("id:twitter:" + mentionIds.get(x));
- mention.setId(Long.parseLong(String.valueOf(mentionIds.get(x))));
mention.setName(mentions.get(x));
mention.setScreenName(mentions.get(x));
userMentions.add(mention);
}
- } else if((mentions != null && !mentions.isEmpty()) || (mentionIds != null && !mentionIds.isEmpty())) {
- if(mentions != null && !mentions.isEmpty()) {
- for(int x = 0; x < mentions.size(); x ++) {
- UserMentions mention = new UserMentions();
- mention.setName(mentions.get(x));
- mention.setScreenName(mentions.get(x));
-
- userMentions.add(mention);
- }
- }
- if(mentionIds != null && !mentionIds.isEmpty()) {
- for(int x = 0; x < mentionIds.size(); x ++) {
- UserMentions mention = new UserMentions();
+ }
+ if(mentionIds != null && !mentionIds.isEmpty()) {
+ for(int x = 0; x < mentionIds.size(); x ++) {
+ UserMentions mention = new UserMentions();
- mention.setIdStr("id:twitter:" + mentionIds.get(x));
- mention.setId(Long.parseLong(String.valueOf(mentionIds.get(x))));
+ mention.setIdStr("id:twitter:" + mentionIds.get(x));
+ mention.setId(Long.parseLong(String.valueOf(mentionIds.get(x))));
- userMentions.add(mention);
- }
+ userMentions.add(mention);
}
}
[03/18] git commit: STREAMS-105 | Ensured that the user_mentions
entries are correctly structured and that the activity object has the correct
ID and content
Posted by sb...@apache.org.
STREAMS-105 | Ensured that the user_mentions entries are correctly structured and that the activity object has the correct ID and content
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c35c2ef7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c35c2ef7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c35c2ef7
Branch: refs/heads/STREAMS-134
Commit: c35c2ef74df06779d50bdf80a79823ade57ac1c4
Parents: d71b159
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Fri Jun 27 16:46:07 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Fri Jun 27 16:46:07 2014 -0500
----------------------------------------------------------------------
.../DatasiftDefaultActivitySerializer.java | 7 +++--
.../DatasiftTweetActivitySerializer.java | 30 ++++++++++++--------
2 files changed, 23 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c35c2ef7/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
index b8c87e5..09126dc 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java
@@ -22,8 +22,8 @@ import java.util.Map;
import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
/**
-*
-*/
+ *
+ */
public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Datasift>, Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftDefaultActivitySerializer.class);
@@ -174,6 +174,9 @@ public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Dat
ActivityObject actObj = new ActivityObject();
actObj.setObjectType(interaction.getContenttype());
actObj.setUrl(interaction.getLink());
+ actObj.setId(interaction.getId());
+ actObj.setContent(interaction.getContent());
+
return actObj;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c35c2ef7/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
index 3406eb4..465196f 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
@@ -31,10 +31,12 @@ import org.apache.streams.datasift.twitter.Twitter;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.pojo.json.Actor;
import org.apache.streams.pojo.json.Image;
+import org.apache.streams.twitter.pojo.UserMentions;
import org.apache.streams.twitter.serializer.util.TwitterActivityUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -204,8 +206,8 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
}
}
}
- extensions.put("hashtags", hashTags);
+ extensions.put("hashtags", hashTags);
if(retweet != null) {
Map<String, Object> rebroadcasts = Maps.newHashMap();
@@ -215,18 +217,22 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
}
if(interaction.getAdditionalProperties() != null) {
- Object mentionsObject = interaction.getAdditionalProperties().get("mentions");
- if(mentionsObject != null ) {
- if(mentionsObject instanceof List) {
- List mentions = (List) mentionsObject;
- List<Map<String, Object>> userMentions = Lists.newLinkedList();
- for(Object mention : mentions) {
- Map<String, Object> actor = Maps.newHashMap();
- actor.put("displayName", mention);
- userMentions.add(actor);
- }
- extensions.put("user_mentions", userMentions);
+ ArrayList<String> mentions = (ArrayList<String>) interaction.getAdditionalProperties().get("mentions");
+ ArrayList<Long> mentionIds = (ArrayList<Long>) interaction.getAdditionalProperties().get("mention_ids");
+ ArrayList<UserMentions> userMentions = new ArrayList<UserMentions>();
+
+ if(mentions != null && mentionIds != null && (mentionIds.size() == mentions.size()) && !mentions.isEmpty() && !mentionIds.isEmpty()) {
+ for(int x = 0; x < mentions.size(); x ++) {
+ UserMentions mention = new UserMentions();
+
+ mention.setIdStr("id:twitter:" + mentionIds.get(x));
+ mention.setId(mentionIds.get(x));
+ mention.setName(mentions.get(x));
+ mention.setScreenName(mentions.get(x));
+
+ userMentions.add(mention);
}
+ extensions.put("user_mentions", userMentions);
}
}
extensions.put("keywords", interaction.getContent());
[15/18] git commit: Merge PR47 branch 'rdouglas/STREAMS-105'
Posted by sb...@apache.org.
Merge PR47 branch 'rdouglas/STREAMS-105'
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b7e4c346
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b7e4c346
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b7e4c346
Branch: refs/heads/STREAMS-134
Commit: b7e4c34672238bbcd5a28cede7c4965e4e9ab49c
Parents: 853ae3c fbb8ccf
Author: mfranklin <mf...@apache.org>
Authored: Mon Jul 14 13:01:41 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Mon Jul 14 13:01:41 2014 -0400
----------------------------------------------------------------------
.../serializer/DatasiftDefaultActivitySerializer.java | 7 +++++--
.../src/main/jsonschema/com/datasift/Datasift.json | 2 +-
2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b7e4c346/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
----------------------------------------------------------------------
[04/18] git commit: STREAMS-105 | Code review feedback
Posted by sb...@apache.org.
STREAMS-105 | Code review feedback
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ce6e3276
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ce6e3276
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ce6e3276
Branch: refs/heads/STREAMS-134
Commit: ce6e3276e113251b15c4655335a6887082e13315
Parents: c35c2ef
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Mon Jun 30 11:38:32 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Mon Jun 30 11:38:32 2014 -0500
----------------------------------------------------------------------
.../DatasiftTweetActivitySerializer.java | 141 +++++++++++--------
.../main/jsonschema/com/datasift/Datasift.json | 2 +-
2 files changed, 86 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6e3276/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
index 465196f..a9514ed 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
@@ -81,58 +81,32 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
return activity;
}
+ /**
+ * Returns the actor created from this particular event and Twitter object
+ *
+ * @param event
+ * @param twitter
+ * @return
+ */
public Actor buildActor(Datasift event, Twitter twitter) {
User user = twitter.getUser();
Actor actor = super.buildActor(event.getInteraction());
- if(user == null) {
- return retweetBuildActor(actor, twitter.getRetweet().getUser());
- }
-
- actor.setDisplayName(user.getName());
- actor.setId(formatId(Optional.fromNullable(
- user.getIdStr())
- .or(Optional.of(user.getId().toString()))
- .orNull()));
- actor.setSummary(user.getDescription());
- try {
- actor.setPublished(RFC3339Utils.parseToUTC(user.getCreatedAt()));
- } catch (Exception e) {
- LOGGER.warn("Exception trying to parse date : {}", e);
- }
- if(user.getUrl() != null) {
- actor.setUrl(user.getUrl());
- }
-
- Map<String, Object> extensions = new HashMap<String,Object>();
- extensions.put("location", user.getLocation());
- extensions.put("posts", user.getStatusesCount());
- extensions.put("followers", user.getFollowersCount());
- extensions.put("screenName", user.getScreenName());
- if(user.getAdditionalProperties() != null) {
- extensions.put("favorites", user.getAdditionalProperties().get("favourites_count"));
- }
-
- Image profileImage = new Image();
- String profileUrl = null;
- profileUrl = event.getInteraction().getAuthor().getAvatar();
- if(profileUrl == null && user.getAdditionalProperties() != null) {
- Object url = user.getAdditionalProperties().get("profile_image_url_https");
- if(url instanceof String)
- profileUrl = (String) url;
- }
- if(profileUrl == null) {
- profileUrl = user.getProfileImageUrl();
+ if (user == null) {
+ return userBuildActor(actor, twitter.getRetweet().getUser());
+ } else {
+ return userBuildActor(actor, user);
}
- profileImage.setUrl(profileUrl);
- actor.setImage(profileImage);
-
- actor.setAdditionalProperty("extensions", extensions);
- return actor;
}
- //Need to make retweet user and tweet user the same object.
- public Actor retweetBuildActor(Actor actor, org.apache.streams.datasift.twitter.User user) {
+ /**
+ * Build an actor object given a User
+ *
+ * @param actor
+ * @param user
+ * @return
+ */
+ private Actor userBuildActor(Actor actor, User user) {
actor.setDisplayName(user.getName());
actor.setId(formatId(Optional.fromNullable(
@@ -169,13 +143,22 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
if(actor.getImage() == null && profileUrl == null) {
profileUrl = user.getProfileImageUrl();
}
- profileImage.setUrl(profileUrl);
- actor.setImage(profileImage);
+
+ if(profileUrl != null) {
+ profileImage.setUrl(profileUrl);
+ actor.setImage(profileImage);
+ }
actor.setAdditionalProperty("extensions", extensions);
return actor;
}
+ /**
+ * Adds location attributes from the given Twitter object
+ *
+ * @param activity
+ * @param twitter
+ */
public void addLocationExtension(Activity activity, Twitter twitter) {
Map<String, Object> extensions = ensureExtensions(activity);
Map<String, Object> location = Maps.newHashMap();
@@ -187,6 +170,13 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
extensions.put("location", location);
}
+ /**
+ * Creates and adds Twitter Extensions from given interaction
+ *
+ * @param activity
+ * @param twitter
+ * @param interaction
+ */
public void addTwitterExtensions(Activity activity, Twitter twitter, Interaction interaction) {
Retweet retweet = twitter.getRetweet();
Map<String, Object> extensions = ensureExtensions(activity);
@@ -217,24 +207,63 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
}
if(interaction.getAdditionalProperties() != null) {
- ArrayList<String> mentions = (ArrayList<String>) interaction.getAdditionalProperties().get("mentions");
- ArrayList<Long> mentionIds = (ArrayList<Long>) interaction.getAdditionalProperties().get("mention_ids");
- ArrayList<UserMentions> userMentions = new ArrayList<UserMentions>();
+ ArrayList<UserMentions> userMentions = createUserMentions(interaction);
+
+ if(userMentions.size() > 0)
+ extensions.put("user_mentions", userMentions);
+ }
+
+ extensions.put("keywords", interaction.getContent());
+ }
+
+ /**
+ * Returns an ArrayList of all UserMentions in this interaction
+ * Note: The ID list and the handle lists do not necessarily correspond 1:1 for this provider
+ * If those lists are the same size, then they will be merged into individual UserMention
+ * objects. However, if they are not the same size, a new UserMention object will be created
+ * for each entry in both lists.
+ *
+ * @param interaction
+ * @return
+ */
+ private ArrayList<UserMentions> createUserMentions(Interaction interaction) {
+ ArrayList<String> mentions = (ArrayList<String>) interaction.getAdditionalProperties().get("mentions");
+ ArrayList<Long> mentionIds = (ArrayList<Long>) interaction.getAdditionalProperties().get("mention_ids");
+ ArrayList<UserMentions> userMentions = new ArrayList<UserMentions>();
- if(mentions != null && mentionIds != null && (mentionIds.size() == mentions.size()) && !mentions.isEmpty() && !mentionIds.isEmpty()) {
+ if(mentions != null && mentionIds != null && (mentionIds.size() == mentions.size()) && !mentions.isEmpty() && !mentionIds.isEmpty()) {
+ for(int x = 0; x < mentions.size(); x ++) {
+ UserMentions mention = new UserMentions();
+
+ mention.setIdStr("id:twitter:" + mentionIds.get(x));
+ mention.setId(Long.parseLong(String.valueOf(mentionIds.get(x))));
+ mention.setName(mentions.get(x));
+ mention.setScreenName(mentions.get(x));
+
+ userMentions.add(mention);
+ }
+ } else if((mentions != null && !mentions.isEmpty()) || (mentionIds != null && !mentionIds.isEmpty())) {
+ if(mentions != null && !mentions.isEmpty()) {
for(int x = 0; x < mentions.size(); x ++) {
UserMentions mention = new UserMentions();
-
- mention.setIdStr("id:twitter:" + mentionIds.get(x));
- mention.setId(mentionIds.get(x));
mention.setName(mentions.get(x));
mention.setScreenName(mentions.get(x));
userMentions.add(mention);
}
- extensions.put("user_mentions", userMentions);
+ }
+ if(mentionIds != null && !mentionIds.isEmpty()) {
+ for(int x = 0; x < mentionIds.size(); x ++) {
+ UserMentions mention = new UserMentions();
+
+ mention.setIdStr("id:twitter:" + mentionIds.get(x));
+ mention.setId(Long.parseLong(String.valueOf(mentionIds.get(x))));
+
+ userMentions.add(mention);
+ }
}
}
- extensions.put("keywords", interaction.getContent());
+
+ return userMentions;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ce6e3276/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
index e7b664a..c30fa8f 100644
--- a/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
+++ b/streams-contrib/streams-provider-datasift/src/main/jsonschema/com/datasift/Datasift.json
@@ -738,7 +738,7 @@
"type": "array",
"items": [
{
- "type": "integer"
+ "type": "string"
}
]
},
[17/18] git commit: percolate processor - tested with
streams-examples/elasticsearch-tag
Posted by sb...@apache.org.
percolate processor - tested with streams-examples/elasticsearch-tag
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7df265f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7df265f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7df265f5
Branch: refs/heads/STREAMS-134
Commit: 7df265f5436d167c0d80d3205b822f67c904bac1
Parents: b3d849d
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Tue Jul 15 20:57:28 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Tue Jul 15 20:57:40 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchConfigurator.java | 28 +-
.../ElasticsearchPersistUpdater.java | 6 +-
.../elasticsearch/PercolateProcessor.java | 294 ---------------
.../processor/PercolateTagProcessor.java | 353 +++++++++++++++++++
.../ElasticsearchWriterConfiguration.json | 22 +-
streams-pojo/pom.xml | 6 +
.../org/apache/streams/data/util/JsonUtil.java | 60 +++-
7 files changed, 424 insertions(+), 345 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
index 26033de..1c66789 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
@@ -78,28 +78,14 @@ public class ElasticsearchConfigurator {
public static ElasticsearchWriterConfiguration detectWriterConfiguration(Config elasticsearch) {
- ElasticsearchConfiguration elasticsearchConfiguration = detectConfiguration(elasticsearch);
- ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = mapper.convertValue(elasticsearchConfiguration, ElasticsearchWriterConfiguration.class);
-
- String index = elasticsearch.getString("index");
- String type = elasticsearch.getString("type");
- Long maxMsBeforeFlush = elasticsearch.hasPath("MaxTimeBetweenFlushMs") ? elasticsearch.getLong("MaxTimeBetweenFlushMs") : null;
-
- if( elasticsearch.hasPath("bulk"))
- elasticsearchWriterConfiguration.setBulk(elasticsearch.getBoolean("bulk"));
-
- if( elasticsearch.hasPath("batchSize"))
- elasticsearchWriterConfiguration.setBatchSize(elasticsearch.getLong("batchSize"));
-
- if( elasticsearch.hasPath("batchBytes"))
- elasticsearchWriterConfiguration.setBatchBytes(elasticsearch.getLong("batchBytes"));
-
-
- elasticsearchWriterConfiguration.setIndex(index);
- elasticsearchWriterConfiguration.setType(type);
- elasticsearchWriterConfiguration.setMaxTimeBetweenFlushMs(maxMsBeforeFlush);
-
+ ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = null;
+ try {
+ elasticsearchWriterConfiguration = mapper.readValue(elasticsearch.root().render(ConfigRenderOptions.concise()), ElasticsearchWriterConfiguration.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn("Could not parse elasticsearchwriterconfiguration");
+ }
return elasticsearchWriterConfiguration;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index b2e7556..602172e 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -128,14 +128,14 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl
private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
- private ElasticsearchConfiguration config;
+ private ElasticsearchWriterConfiguration config;
public ElasticsearchPersistUpdater() {
Config config = StreamsConfigurator.config.getConfig("elasticsearch");
- this.config = ElasticsearchConfigurator.detectConfiguration(config);
+ this.config = ElasticsearchConfigurator.detectWriterConfiguration(config);
}
- public ElasticsearchPersistUpdater(ElasticsearchConfiguration config) {
+ public ElasticsearchPersistUpdater(ElasticsearchWriterConfiguration config) {
this.config = config;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
deleted file mode 100644
index c5d9f4c..0000000
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.elasticsearch;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.data.util.ActivityUtil;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.percolate.PercolateResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.index.query.QueryStringQueryBuilder;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-/**
- * References:
- * Some helpful references to help
- * Purpose URL
- * ------------- ----------------------------------------------------------------
- * [Status Codes] http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
- * [Test Cases] http://greenbytes.de/tech/tc/httpredirects/
- * [t.co behavior] https://dev.twitter.com/docs/tco-redirection-behavior
- */
-
-public class PercolateProcessor implements StreamsProcessor {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(PercolateProcessor.class);
-
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
- protected Queue<StreamsDatum> inQueue;
- protected Queue<StreamsDatum> outQueue;
-
- public String TAGS_EXTENSION = "tags";
-
- private ElasticsearchWriterConfiguration config;
- private ElasticsearchClientManager manager;
- private BulkRequestBuilder bulkBuilder;
-
- public PercolateProcessor(ElasticsearchConfiguration config) {
- manager = new ElasticsearchClientManager(config);
- }
-
- public ElasticsearchClientManager getManager() {
- return manager;
- }
-
- public void setManager(ElasticsearchClientManager manager) {
- this.manager = manager;
- }
-
- public ElasticsearchConfiguration getConfig() {
- return config;
- }
-
- public void setConfig(ElasticsearchWriterConfiguration config) {
- this.config = config;
- }
-
- public Queue<StreamsDatum> getProcessorOutputQueue() {
- return outQueue;
- }
-
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
-
- List<StreamsDatum> result = Lists.newArrayList();
-
- String json;
- ObjectNode node;
- // first check for valid json
- if (entry.getDocument() instanceof String) {
- json = (String) entry.getDocument();
- try {
- node = (ObjectNode) mapper.readTree(json);
- } catch (IOException e) {
- e.printStackTrace();
- return null;
- }
- } else {
- node = (ObjectNode) entry.getDocument();
- json = node.asText();
- }
-
- PercolateResponse response = manager.getClient().preparePercolate().setSource(json).execute().actionGet();
-
- ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
-
- for (PercolateResponse.Match match : response.getMatches()) {
- tagArray.add(match.getId().string());
- }
-
- Activity activity = mapper.convertValue(node, Activity.class);
-
- Map<String, Object> extensions = ActivityUtil.ensureExtensions(activity);
-
- extensions.put(TAGS_EXTENSION, tagArray);
-
- activity.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
-
- result.add(entry);
-
- return result;
-
- }
-
- @Override
- public void prepare(Object o) {
-
- Preconditions.checkNotNull(manager);
- Preconditions.checkNotNull(manager.getClient());
- Preconditions.checkNotNull(config);
- Preconditions.checkNotNull(config.getTags());
- Preconditions.checkArgument(config.getTags().size() > 0);
-
- // consider using mapping to figure out what fields are included in _all
- //manager.getClient().admin().indices().prepareGetMappings(config.getIndex()).get().getMappings().get(config.getType()).;
-
- deleteOldQueries(config.getIndex());
- for (Tag tag : config.getTags()) {
- PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag.getId());
- queryBuilder.addQuery(tag.getQuery(), FilterLevel.MUST, "_all");
- addPercolateRule(queryBuilder, config.getIndex());
- }
- if (writePercolateRules() == true)
- LOGGER.info("wrote " + bulkBuilder.numberOfActions() + " tags to _percolator");
- else
- LOGGER.error("FAILED writing " + bulkBuilder.numberOfActions() + " tags to _percolator");
-
-
- }
-
- @Override
- public void cleanUp() {
- manager.getClient().close();
- }
-
- public int numOfPercolateRules() {
- return this.bulkBuilder.numberOfActions();
- }
-
- public void addPercolateRule(PercolateQueryBuilder builder, String index) {
- this.bulkBuilder.add(manager.getClient().prepareIndex("_percolator", index, builder.getId()).setSource(builder.getSource()));
- }
-
- /**
- *
- * @return returns true if all rules were addded. False indicates one or more rules have failed.
- */
- public boolean writePercolateRules() {
- if(this.numOfPercolateRules() < 0) {
- throw new RuntimeException("No Rules Have been added!");
- }
- BulkResponse response = this.bulkBuilder.execute().actionGet();
- for(BulkItemResponse r : response.getItems()) {
- if(r.isFailed()) {
- System.out.println(r.getId()+"\t"+r.getFailureMessage());
- }
- }
- return !response.hasFailures();
- }
-
- /**
- *
- * @param ids
- * @param index
- * @return Returns true if all of the old tags were removed. False indicates one or more tags were not removed.
- */
- public boolean removeOldTags(Set<String> ids, String index) {
- if(ids.size() == 0) {
- return false;
- }
- BulkRequestBuilder bulk = manager.getClient().prepareBulk();
- for(String id : ids) {
- bulk.add(manager.getClient().prepareDelete("_percolator", index, id));
- }
- return !bulk.execute().actionGet().hasFailures();
- }
-
- public Set<String> getActivePercolateTags(String index) {
- Set<String> tags = new HashSet<String>();
- SearchRequestBuilder searchBuilder = manager.getClient().prepareSearch("_percolator").setTypes(index).setSize(1000);
- SearchResponse response = searchBuilder.setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
- SearchHits hits = response.getHits();
- for(SearchHit hit : hits.getHits()) {
- tags.add(hit.id());
- }
- return tags;
- }
-
- /**
- *
- * @param index
- * @return
- */
- public boolean deleteOldQueries(String index) {
- Set<String> tags = getActivePercolateTags(index);
- if(tags.size() == 0) {
- LOGGER.warn("No active tags were found in _percolator for index : {}", index);
- return false;
- }
- LOGGER.info("Deleting {} tags.", tags.size());
- BulkRequestBuilder bulk = manager.getClient().prepareBulk();
- for(String tag : tags) {
- bulk.add(manager.getClient().prepareDelete("_percolator", index, tag));
- }
- BulkResponse response =bulk.execute().actionGet();
- return !response.hasFailures();
- }
-
- public static class PercolateQueryBuilder {
-
- private BoolQueryBuilder queryBuilder;
- private String id;
-
- public PercolateQueryBuilder(String id) {
- this.id = id;
- this.queryBuilder = QueryBuilders.boolQuery();
- }
-
- public void setMinumumNumberShouldMatch(int shouldMatch) {
- this.queryBuilder.minimumNumberShouldMatch(shouldMatch);
- }
-
-
- public void addQuery(String query, FilterLevel level, String... fields) {
- QueryStringQueryBuilder builder = QueryBuilders.queryString(query);
- if(fields != null && fields.length > 0) {
- for(String field : fields) {
- builder.field(field);
- }
- }
- switch (level) {
- case MUST:
- this.queryBuilder.must(builder);
- break;
- case SHOULD:
- this.queryBuilder.should(builder);
- break;
- case MUST_NOT:
- this.queryBuilder.mustNot(builder);
- }
- }
-
- public String getId() {
- return this.id;
- }
-
- public String getSource() {
- return "{ \n\"query\" : "+this.queryBuilder.toString()+"\n}";
- }
-
-
- }
-
- public enum FilterLevel {
- MUST, SHOULD, MUST_NOT
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
new file mode 100644
index 0000000..075d2b2
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.elasticsearch.processor;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.elasticsearch.ElasticsearchClientManager;
+import org.apache.streams.elasticsearch.ElasticsearchConfiguration;
+import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.percolate.PercolateRequestBuilder;
+import org.elasticsearch.action.percolate.PercolateResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.QueryStringQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * References:
+ * Some helpful references to help
+ * Purpose URL
+ * ------------- ----------------------------------------------------------------
+ * [Status Codes] http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
+ * [Test Cases] http://greenbytes.de/tech/tc/httpredirects/
+ * [t.co behavior] https://dev.twitter.com/docs/tco-redirection-behavior
+ */
+
+public class PercolateTagProcessor implements StreamsProcessor {
+
+ public static final String STREAMS_ID = "PercolateTagProcessor";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(PercolateTagProcessor.class);
+
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ protected Queue<StreamsDatum> inQueue;
+ protected Queue<StreamsDatum> outQueue;
+
+ public String TAGS_EXTENSION = "tags";
+
+ private ElasticsearchWriterConfiguration config;
+ private ElasticsearchClientManager manager;
+ private BulkRequestBuilder bulkBuilder;
+
+ public PercolateTagProcessor(ElasticsearchWriterConfiguration config) {
+ this.config = config;
+ manager = new ElasticsearchClientManager(config);
+ }
+
+ public ElasticsearchClientManager getManager() {
+ return manager;
+ }
+
+ public void setManager(ElasticsearchClientManager manager) {
+ this.manager = manager;
+ }
+
+ public ElasticsearchConfiguration getConfig() {
+ return config;
+ }
+
+ public void setConfig(ElasticsearchWriterConfiguration config) {
+ this.config = config;
+ }
+
+ public Queue<StreamsDatum> getProcessorOutputQueue() {
+ return outQueue;
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ List<StreamsDatum> result = Lists.newArrayList();
+
+ String json;
+ ObjectNode node;
+ // first check for valid json
+ if (entry.getDocument() instanceof String) {
+ json = (String) entry.getDocument();
+ try {
+ node = (ObjectNode) mapper.readTree(json);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+ } else {
+ node = (ObjectNode) entry.getDocument();
+ try {
+ json = mapper.writeValueAsString(node);
+ } catch (JsonProcessingException e) {
+ LOGGER.warn("Invalid datum: ", node);
+ return null;
+ }
+ }
+
+ StringBuilder percolateRequestJson = new StringBuilder();
+ percolateRequestJson.append("{ \"doc\": ");
+ percolateRequestJson.append(json);
+ //percolateRequestJson.append("{ \"content\" : \"crazy good shit\" }");
+ percolateRequestJson.append("}");
+
+ PercolateRequestBuilder request;
+ PercolateResponse response;
+
+ try {
+ LOGGER.trace("Percolate request json: {}", percolateRequestJson.toString());
+ request = manager.getClient().preparePercolate().setIndices(config.getIndex()).setDocumentType(config.getType()).setSource(percolateRequestJson.toString());
+ LOGGER.trace("Percolate request: {}", mapper.writeValueAsString(request.request()));
+ response = request.execute().actionGet();
+ LOGGER.trace("Percolate response: {} matches", response.getMatches().length);
+ } catch (Exception e) {
+ LOGGER.warn("Percolate exception: {}", e.getMessage());
+ return null;
+ }
+
+ ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
+
+ Iterator<PercolateResponse.Match> matchIterator = response.iterator();
+ while(matchIterator.hasNext()) {
+ tagArray.add(matchIterator.next().getId().string());
+ }
+
+ LOGGER.trace("Percolate matches: {}", tagArray);
+
+ Activity activity = mapper.convertValue(node, Activity.class);
+
+ Map<String, Object> extensions = ActivityUtil.ensureExtensions(activity);
+
+ extensions.put(TAGS_EXTENSION, tagArray);
+
+ activity.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
+
+ entry.setDocument(activity);
+
+ result.add(entry);
+
+ return result;
+
+ }
+
+ @Override
+ public void prepare(Object o) {
+
+ Preconditions.checkNotNull(manager);
+ Preconditions.checkNotNull(manager.getClient());
+ Preconditions.checkNotNull(config);
+ Preconditions.checkNotNull(config.getTags());
+ Preconditions.checkArgument(config.getTags().getAdditionalProperties().size() > 0);
+
+ // consider using mapping to figure out what fields are included in _all
+ //manager.getClient().admin().indices().prepareGetMappings(config.getIndex()).get().getMappings().get(config.getType()).;
+
+ //deleteOldQueries(config.getIndex());
+ bulkBuilder = manager.getClient().prepareBulk();
+ for (String tag : config.getTags().getAdditionalProperties().keySet()) {
+ String query = (String)config.getTags().getAdditionalProperties().get(tag);
+ PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag, query);
+ addPercolateRule(queryBuilder, config.getIndex());
+ }
+ if (writePercolateRules() == true)
+ LOGGER.info("wrote " + bulkBuilder.numberOfActions() + " tags to " + config.getIndex() + " _percolator");
+ else
+ LOGGER.error("FAILED writing " + bulkBuilder.numberOfActions() + " tags to " + config.getIndex() + " _percolator");
+
+
+ }
+
+ @Override
+ public void cleanUp() {
+ deleteOldQueries(config.getIndex());
+ manager.getClient().close();
+ }
+
+ public int numOfPercolateRules() {
+ return this.bulkBuilder.numberOfActions();
+ }
+
+ public void addPercolateRule(PercolateQueryBuilder builder, String index) {
+ this.bulkBuilder.add(manager.getClient().prepareIndex(index, ".percolator", builder.getId())
+ .setSource(builder.getSource()));
+ }
+
+ /**
+ *
+ * @return returns true if all rules were addded. False indicates one or more rules have failed.
+ */
+ public boolean writePercolateRules() {
+ if(this.numOfPercolateRules() < 0) {
+ throw new RuntimeException("No Rules Have been added!");
+ }
+ BulkResponse response = this.bulkBuilder.execute().actionGet();
+ for(BulkItemResponse r : response.getItems()) {
+ if(r.isFailed()) {
+ System.out.println(r.getId()+"\t"+r.getFailureMessage());
+ }
+ }
+ return !response.hasFailures();
+ }
+
+ /**
+ *
+ * @param ids
+ * @param index
+ * @return Returns true if all of the old tags were removed. False indicates one or more tags were not removed.
+ */
+ public boolean removeOldTags(Set<String> ids, String index) {
+ if(ids.size() == 0) {
+ return false;
+ }
+ BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+ for(String id : ids) {
+ bulk.add(manager.getClient().prepareDelete("_percolator", index, id));
+ }
+ return !bulk.execute().actionGet().hasFailures();
+ }
+
+ public Set<String> getActivePercolateTags(String index) {
+ Set<String> tags = new HashSet<String>();
+ SearchRequestBuilder searchBuilder = manager.getClient().prepareSearch("*").setIndices(index).setTypes(".percolator").setSize(1000);
+ SearchResponse response = searchBuilder.setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
+ SearchHits hits = response.getHits();
+ for(SearchHit hit : hits.getHits()) {
+ tags.add(hit.id());
+ }
+ return tags;
+ }
+
+ /**
+ *
+ * @param index
+ * @return
+ */
+ public boolean deleteOldQueries(String index) {
+ Set<String> tags = getActivePercolateTags(index);
+ if(tags.size() == 0) {
+ LOGGER.warn("No active tags were found in _percolator for index : {}", index);
+ return false;
+ }
+ LOGGER.info("Deleting {} tags.", tags.size());
+ BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+ for(String tag : tags) {
+ bulk.add(manager.getClient().prepareDelete().setType(".percolator").setIndex(index).setId(tag));
+ }
+ BulkResponse response =bulk.execute().actionGet();
+ return !response.hasFailures();
+ }
+
+// public static class PercolateQueryBuilder {
+//
+// private BoolQueryBuilder queryBuilder;
+// private String id;
+//
+// public PercolateQueryBuilder(String id) {
+// this.id = id;
+// this.queryBuilder = QueryBuilders.boolQuery();
+// }
+//
+// public void setMinumumNumberShouldMatch(int shouldMatch) {
+// this.queryBuilder.minimumNumberShouldMatch(shouldMatch);
+// }
+//
+//
+// public void addQuery(String query, FilterLevel level, String... fields) {
+// QueryStringQueryBuilder builder = QueryBuilders.queryString(query);
+// if(fields != null && fields.length > 0) {
+// for(String field : fields) {
+// builder.field(field);
+// }
+// }
+// switch (level) {
+// case MUST:
+// this.queryBuilder.must(builder);
+// break;
+// case SHOULD:
+// this.queryBuilder.should(builder);
+// break;
+// case MUST_NOT:
+// this.queryBuilder.mustNot(builder);
+// }
+// }
+//
+// public String getId() {
+// return this.id;
+// }
+//
+// public String getSource() {
+// return "{ \n\"query\" : "+this.queryBuilder.toString()+"\n}";
+// }
+//
+//
+// }
+
+ public static class PercolateQueryBuilder {
+
+ private QueryStringQueryBuilder queryBuilder;
+ private String id;
+
+ public PercolateQueryBuilder(String id, String query) {
+ this.id = id;
+ this.queryBuilder = QueryBuilders.queryString(query);
+ }
+
+ public String getId() {
+ return this.id;
+ }
+
+ public String getSource() {
+ return "{ \n\"query\" : "+this.queryBuilder.toString()+"\n}";
+ }
+
+ }
+
+
+ public enum FilterLevel {
+ MUST, SHOULD, MUST_NOT
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index bf7b146..cd23fe2 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -32,23 +32,13 @@
"maxTimeBetweenFlushMs": {
"type": "integer"
},
+ "script": {
+ "type": "string",
+ "description": "Script to execute during index"
+ },
"tags": {
- "type": "array",
- "description": "Tags to apply",
- "items": {
- "type": "object",
- "description": "Tag to apply",
- "properties": {
- "id": {
- "type": "string",
- "description": "Tag identifier"
- },
- "query": {
- "type": "string",
- "description": "Tag query"
- }
- }
- }
+ "type": "object",
+ "description": "Tags to apply during index"
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-pojo/pom.xml
----------------------------------------------------------------------
diff --git a/streams-pojo/pom.xml b/streams-pojo/pom.xml
index a3a12f6..ca6d953 100644
--- a/streams-pojo/pom.xml
+++ b/streams-pojo/pom.xml
@@ -85,6 +85,12 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7df265f5/streams-pojo/src/main/java/org/apache/streams/data/util/JsonUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/util/JsonUtil.java b/streams-pojo/src/main/java/org/apache/streams/data/util/JsonUtil.java
index d49ef2a..79ac555 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/util/JsonUtil.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/util/JsonUtil.java
@@ -24,9 +24,15 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import java.io.*;
-import java.util.List;
+import java.util.*;
/**
* JSON utilities
@@ -35,10 +41,10 @@ public class JsonUtil {
private JsonUtil() {}
- public static JsonNode jsonToJsonNode(String json) {
- ObjectMapper mapper = new ObjectMapper();
- JsonFactory factory = mapper.getFactory();
+ private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ private static JsonFactory factory = mapper.getFactory();
+ public static JsonNode jsonToJsonNode(String json) {
JsonNode node;
try {
JsonParser jp = factory.createJsonParser(json);
@@ -50,7 +56,6 @@ public class JsonUtil {
}
public static String jsonNodeToJson(JsonNode node) {
- ObjectMapper mapper = new ObjectMapper();
try {
return mapper.writeValueAsString(node);
} catch (JsonProcessingException e) {
@@ -59,7 +64,6 @@ public class JsonUtil {
}
public static <T> T jsonToObject(String json, Class<T> clazz) {
- ObjectMapper mapper = new ObjectMapper();
try {
return mapper.readValue(json, clazz);
} catch (IOException e) {
@@ -68,22 +72,18 @@ public class JsonUtil {
}
public static <T> T jsonNodeToObject(JsonNode node, Class<T> clazz) {
- ObjectMapper mapper = new ObjectMapper();
return mapper.convertValue(node, clazz);
}
public static <T> JsonNode objectToJsonNode(T obj) {
- ObjectMapper mapper = new ObjectMapper();
return mapper.valueToTree(obj);
}
public static <T> List<T> jsoNodeToList(JsonNode node, Class<T> clazz) {
- ObjectMapper mapper = new ObjectMapper();
return mapper.convertValue(node, new TypeReference<List<T>>() {});
}
public static <T> String objectToJson(T object) {
- ObjectMapper mapper = new ObjectMapper();
try {
return mapper.writeValueAsString(object);
} catch (IOException e) {
@@ -96,7 +96,6 @@ public class JsonUtil {
}
public static JsonNode getFromFile(String filePath) {
- ObjectMapper mapper = new ObjectMapper();
JsonFactory factory = mapper.getFactory(); // since 2.1 use mapper.getFactory() instead
JsonNode node = null;
@@ -123,4 +122,43 @@ public class JsonUtil {
return stream;
}
+
+ /**
+ * Creates an empty array if missing
+ * @param node object to create the array within
+ * @param field location to create the array
+ * @return the Map representing the extensions property
+ */
+ public static ArrayNode ensureArray(ObjectNode node, String field) {
+ String[] path = Lists.newArrayList(Splitter.on('.').split(field)).toArray(new String[0]);
+ ObjectNode current = node;
+ ArrayNode result = null;
+ for( int i = 0; i < path.length; i++) {
+ current = ensureObject((ObjectNode) node.get(path[i]), path[i]);
+ }
+ if (current.get(field) == null)
+ current.put(field, mapper.createArrayNode());
+ result = (ArrayNode) node.get(field);
+ return result;
+ }
+
+ /**
+ * Creates an empty array if missing
+ * @param node objectnode to create the object within
+ * @param field location to create the object
+ * @return the Map representing the extensions property
+ */
+ public static ObjectNode ensureObject(ObjectNode node, String field) {
+ String[] path = Lists.newArrayList(Splitter.on('.').split(field)).toArray(new String[0]);
+ ObjectNode current = node;
+ ObjectNode result = null;
+ for( int i = 0; i < path.length; i++) {
+ if (node.get(field) == null)
+ node.put(field, mapper.createObjectNode());
+ current = (ObjectNode) node.get(field);
+ }
+ result = ensureObject((ObjectNode) node.get(path[path.length]), Joiner.on('.').join(Arrays.copyOfRange(path, 1, path.length)));
+ return result;
+ }
+
}
[18/18] git commit: Merge branch 'STREAMS-134' of
https://git-wip-us.apache.org/repos/asf/incubator-streams into STREAMS-134
Posted by sb...@apache.org.
Merge branch 'STREAMS-134' of https://git-wip-us.apache.org/repos/asf/incubator-streams into STREAMS-134
Conflicts:
streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2f1bc422
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2f1bc422
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2f1bc422
Branch: refs/heads/STREAMS-134
Commit: 2f1bc422612d3d6030a48bd956ba9b0c1994855c
Parents: 7df265f 4e9a2bb
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Tue Jul 15 21:00:11 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Tue Jul 15 21:00:11 2014 -0500
----------------------------------------------------------------------
----------------------------------------------------------------------
[16/18] git commit: percolate implementation, untested
Posted by sb...@apache.org.
percolate implementation, untested
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b3d849d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b3d849d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b3d849d6
Branch: refs/heads/STREAMS-134
Commit: b3d849d65b4137c84f9ed12d86322d14758f9b3f
Parents: b7e4c34
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Fri Jul 11 00:49:51 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Tue Jul 15 20:57:39 2014 -0500
----------------------------------------------------------------------
.../elasticsearch/PercolateProcessor.java | 205 +++++++++++++++----
.../ElasticsearchWriterConfiguration.json | 20 +-
.../apache/streams/data/util/ActivityUtil.java | 5 +
3 files changed, 189 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b3d849d6/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
index 9bc8d42..c5d9f4c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/PercolateProcessor.java
@@ -18,25 +18,33 @@
package org.apache.streams.elasticsearch;
-import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.percolate.PercolateResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.QueryStringQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
/**
* References:
@@ -48,20 +56,23 @@ import java.util.concurrent.LinkedBlockingQueue;
* [t.co behavior] https://dev.twitter.com/docs/tco-redirection-behavior
*/
-public class PercolateProcessor implements StreamsProcessor, Runnable {
+public class PercolateProcessor implements StreamsProcessor {
+
private final static Logger LOGGER = LoggerFactory.getLogger(PercolateProcessor.class);
- private ObjectMapper mapper = new ObjectMapper();
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
protected Queue<StreamsDatum> inQueue;
protected Queue<StreamsDatum> outQueue;
+ public String TAGS_EXTENSION = "tags";
+
private ElasticsearchWriterConfiguration config;
private ElasticsearchClientManager manager;
+ private BulkRequestBuilder bulkBuilder;
- public PercolateProcessor(Queue<StreamsDatum> inQueue) {
- this.inQueue = inQueue;
- this.outQueue = new LinkedBlockingQueue<StreamsDatum>();
+ public PercolateProcessor(ElasticsearchConfiguration config) {
+ manager = new ElasticsearchClientManager(config);
}
public ElasticsearchClientManager getManager() {
@@ -72,7 +83,7 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
this.manager = manager;
}
- public ElasticsearchWriterConfiguration getConfig() {
+ public ElasticsearchConfiguration getConfig() {
return config;
}
@@ -80,16 +91,6 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
this.config = config;
}
- public void start() {
- Preconditions.checkNotNull(config);
- Preconditions.checkNotNull(manager);
- Preconditions.checkNotNull(manager.getClient());
- }
-
- public void stop() {
-
- }
-
public Queue<StreamsDatum> getProcessorOutputQueue() {
return outQueue;
}
@@ -115,19 +116,21 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
json = node.asText();
}
- PercolateResponse response = manager.getClient().preparePercolate().setDocumentType(config.getType()).setSource(json).execute().actionGet();
+ PercolateResponse response = manager.getClient().preparePercolate().setSource(json).execute().actionGet();
ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
for (PercolateResponse.Match match : response.getMatches()) {
tagArray.add(match.getId().string());
-
}
- // need utility methods for get / create specific node
- ObjectNode extensions = (ObjectNode) node.get("extensions");
- ObjectNode w2o = (ObjectNode) extensions.get("w2o");
- w2o.put("tags", tagArray);
+ Activity activity = mapper.convertValue(node, Activity.class);
+
+ Map<String, Object> extensions = ActivityUtil.ensureExtensions(activity);
+
+ extensions.put(TAGS_EXTENSION, tagArray);
+
+ activity.setAdditionalProperty(ActivityUtil.EXTENSION_PROPERTY, extensions);
result.add(entry);
@@ -137,33 +140,155 @@ public class PercolateProcessor implements StreamsProcessor, Runnable {
@Override
public void prepare(Object o) {
- start();
+
+ Preconditions.checkNotNull(manager);
+ Preconditions.checkNotNull(manager.getClient());
+ Preconditions.checkNotNull(config);
+ Preconditions.checkNotNull(config.getTags());
+ Preconditions.checkArgument(config.getTags().size() > 0);
+
+ // consider using mapping to figure out what fields are included in _all
+ //manager.getClient().admin().indices().prepareGetMappings(config.getIndex()).get().getMappings().get(config.getType()).;
+
+ deleteOldQueries(config.getIndex());
+ for (Tag tag : config.getTags()) {
+ PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag.getId());
+ queryBuilder.addQuery(tag.getQuery(), FilterLevel.MUST, "_all");
+ addPercolateRule(queryBuilder, config.getIndex());
+ }
+ if (writePercolateRules() == true)
+ LOGGER.info("wrote " + bulkBuilder.numberOfActions() + " tags to _percolator");
+ else
+ LOGGER.error("FAILED writing " + bulkBuilder.numberOfActions() + " tags to _percolator");
+
+
}
@Override
public void cleanUp() {
- stop();
+ manager.getClient().close();
}
- @Override
- public void run() {
+ public int numOfPercolateRules() {
+ return this.bulkBuilder.numberOfActions();
+ }
- while (true) {
- StreamsDatum item;
- try {
- item = inQueue.poll();
+ public void addPercolateRule(PercolateQueryBuilder builder, String index) {
+ this.bulkBuilder.add(manager.getClient().prepareIndex("_percolator", index, builder.getId()).setSource(builder.getSource()));
+ }
- Thread.sleep(new Random().nextInt(100));
+ /**
+ *
+ * @return returns true if all rules were addded. False indicates one or more rules have failed.
+ */
+ public boolean writePercolateRules() {
+ if(this.numOfPercolateRules() < 0) {
+ throw new RuntimeException("No Rules Have been added!");
+ }
+ BulkResponse response = this.bulkBuilder.execute().actionGet();
+ for(BulkItemResponse r : response.getItems()) {
+ if(r.isFailed()) {
+ System.out.println(r.getId()+"\t"+r.getFailureMessage());
+ }
+ }
+ return !response.hasFailures();
+ }
- for (StreamsDatum entry : process(item)) {
- outQueue.offer(entry);
- }
+ /**
+ *
+ * @param ids
+ * @param index
+ * @return Returns true if all of the old tags were removed. False indicates one or more tags were not removed.
+ */
+ public boolean removeOldTags(Set<String> ids, String index) {
+ if(ids.size() == 0) {
+ return false;
+ }
+ BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+ for(String id : ids) {
+ bulk.add(manager.getClient().prepareDelete("_percolator", index, id));
+ }
+ return !bulk.execute().actionGet().hasFailures();
+ }
+ public Set<String> getActivePercolateTags(String index) {
+ Set<String> tags = new HashSet<String>();
+ SearchRequestBuilder searchBuilder = manager.getClient().prepareSearch("_percolator").setTypes(index).setSize(1000);
+ SearchResponse response = searchBuilder.setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
+ SearchHits hits = response.getHits();
+ for(SearchHit hit : hits.getHits()) {
+ tags.add(hit.id());
+ }
+ return tags;
+ }
- } catch (Exception e) {
- e.printStackTrace();
+ /**
+ *
+ * @param index
+ * @return
+ */
+ public boolean deleteOldQueries(String index) {
+ Set<String> tags = getActivePercolateTags(index);
+ if(tags.size() == 0) {
+ LOGGER.warn("No active tags were found in _percolator for index : {}", index);
+ return false;
+ }
+ LOGGER.info("Deleting {} tags.", tags.size());
+ BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+ for(String tag : tags) {
+ bulk.add(manager.getClient().prepareDelete("_percolator", index, tag));
+ }
+ BulkResponse response =bulk.execute().actionGet();
+ return !response.hasFailures();
+ }
+
+ public static class PercolateQueryBuilder {
+
+ private BoolQueryBuilder queryBuilder;
+ private String id;
+
+ public PercolateQueryBuilder(String id) {
+ this.id = id;
+ this.queryBuilder = QueryBuilders.boolQuery();
+ }
+ public void setMinumumNumberShouldMatch(int shouldMatch) {
+ this.queryBuilder.minimumNumberShouldMatch(shouldMatch);
+ }
+
+
+ public void addQuery(String query, FilterLevel level, String... fields) {
+ QueryStringQueryBuilder builder = QueryBuilders.queryString(query);
+ if(fields != null && fields.length > 0) {
+ for(String field : fields) {
+ builder.field(field);
+ }
}
+ switch (level) {
+ case MUST:
+ this.queryBuilder.must(builder);
+ break;
+ case SHOULD:
+ this.queryBuilder.should(builder);
+ break;
+ case MUST_NOT:
+ this.queryBuilder.mustNot(builder);
+ }
+ }
+
+ public String getId() {
+ return this.id;
}
+
+ public String getSource() {
+ return "{ \n\"query\" : "+this.queryBuilder.toString()+"\n}";
+ }
+
+
}
+
+ public enum FilterLevel {
+ MUST, SHOULD, MUST_NOT
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b3d849d6/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index a38ff85..bf7b146 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -31,6 +31,24 @@
},
"maxTimeBetweenFlushMs": {
"type": "integer"
- }
+ },
+ "tags": {
+ "type": "array",
+ "description": "Tags to apply",
+ "items": {
+ "type": "object",
+ "description": "Tag to apply",
+ "properties": {
+ "id": {
+ "type": "string",
+ "description": "Tag identifier"
+ },
+ "query": {
+ "type": "string",
+ "description": "Tag query"
+ }
+ }
+ }
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b3d849d6/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
index f1fcf44..3684b32 100644
--- a/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
+++ b/streams-pojo/src/main/java/org/apache/streams/data/util/ActivityUtil.java
@@ -18,6 +18,8 @@
package org.apache.streams.data.util;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
import java.util.HashMap;
@@ -27,6 +29,7 @@ import java.util.Map;
* Utility class for managing activities
*/
public class ActivityUtil {
+
private ActivityUtil() {}
/**
@@ -58,6 +61,8 @@ public class ActivityUtil {
*/
public static final String LOCATION_EXTENSION_COORDINATES = "coordinates";
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
/**
* Creates a standard extension property
* @param activity activity to create the property in
[08/18] git commit: STREAMS-105 | Code review feedback
Posted by sb...@apache.org.
STREAMS-105 | Code review feedback
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/abaf5b00
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/abaf5b00
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/abaf5b00
Branch: refs/heads/STREAMS-134
Commit: abaf5b00b3cfb196ff736029a8dc44d3c90b7a86
Parents: 26f3614
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Mon Jul 7 13:23:11 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Mon Jul 7 13:23:11 2014 -0500
----------------------------------------------------------------------
.../datasift/serializer/DatasiftTweetActivitySerializer.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/abaf5b00/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
index 0482235..158d027 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java
@@ -165,7 +165,7 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri
double[] coordiantes = new double[] { twitter.getGeo().getLongitude(), twitter.getGeo().getLatitude() };
Map<String, Object> coords = Maps.newHashMap();
coords.put("coordinates", coordiantes);
- coords.put("type", "point");
+ coords.put("type", "Point");
location.put("coordinates", coords);
extensions.put("location", location);
}
[09/18] git commit: Merge pull request #7 from apache/master
Posted by sb...@apache.org.
Merge pull request #7 from apache/master
Merge Apache
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/af1f2943
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/af1f2943
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/af1f2943
Branch: refs/heads/STREAMS-134
Commit: af1f2943c765536ac0424b008d9a5f92984ca984
Parents: 3b98df8 c50ce91
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Tue Jul 8 13:35:46 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Tue Jul 8 13:35:46 2014 -0500
----------------------------------------------------------------------
.../streams-persist-elasticsearch/README.md | 38 +++
.../ElasticsearchConfigurator.java | 14 +
.../elasticsearch/ElasticsearchQuery.java | 34 ++-
.../ElasticsearchReaderConfiguration.json | 5 +
.../src/main/resources/reference.json | 9 +
streams-contrib/streams-persist-mongo/README.md | 16 ++
.../streams/mongo/MongoPersistReader.java | 270 +++++++++++++++++++
.../src/main/resources/reference.json | 8 +
.../src/main/resources/reference.properties | 10 -
.../twitter/processor/TwitterTypeConverter.java | 2 +-
.../TwitterJsonActivitySerializer.java | 3 +
.../TwitterJsonUserActivitySerializer.java | 72 +++++
.../serializer/util/TwitterActivityUtil.java | 22 ++
.../streams/local/builders/StreamComponent.java | 2 +-
.../local/tasks/StreamsProviderTask.java | 3 +-
15 files changed, 490 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
[12/18] git commit: Merge pull request #8 from apache/master
Posted by sb...@apache.org.
Merge pull request #8 from apache/master
Merge Apache
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5c9a531d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5c9a531d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5c9a531d
Branch: refs/heads/STREAMS-134
Commit: 5c9a531d9227f44133fe67aea3f90ebbaf3de020
Parents: af1f294 38ed41a
Author: Robert Douglas <rd...@w2odigital.com>
Authored: Thu Jul 10 10:15:52 2014 -0500
Committer: Robert Douglas <rd...@w2odigital.com>
Committed: Thu Jul 10 10:15:52 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchPersistReader.java | 3 +-
.../DatasiftTypeConverterProcessor.java | 12 +-----
.../DatasiftTweetActivitySerializer.java | 42 +++++++++++++++++++-
.../provider/SysomosHeartbeatStream.java | 10 +++++
.../sysomos/provider/SysomosProvider.java | 4 +-
5 files changed, 57 insertions(+), 14 deletions(-)
----------------------------------------------------------------------