You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/19 01:06:37 UTC
[5/6] incubator-streams git commit: Resolves STREAMS-416
Resolves STREAMS-416
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0b512d8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0b512d8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0b512d8b
Branch: refs/heads/master
Commit: 0b512d8b6c0d2b36a635d20b9be9a481eb67d6b9
Parents: 2c12724
Author: Elias Ponvert <ep...@gmail.com>
Authored: Mon Oct 17 10:37:51 2016 -0500
Committer: Elias Ponvert <ep...@gmail.com>
Committed: Mon Oct 17 10:37:51 2016 -0500
----------------------------------------------------------------------
streams-contrib/pom.xml | 5 -
.../streams-persist-cassandra/pom.xml | 168 ----
.../configuration/CassandraConfiguration.java | 81 --
.../model/CassandraActivityStreamsEntry.java | 45 -
.../CassandraActivityStreamsRepository.java | 176 ----
.../repository/impl/CassandraKeyspace.java | 61 --
.../impl/CassandraSubscriptionRepository.java | 69 --
.../spring/streams-cassandra-context.xml | 25 -
.../CassandraActivityStreamsRepositoryTest.java | 99 ---
.../impl/CassandraActivitySubscriptionTest.java | 54 --
.../streams-provider-datasift/README.md | 18 -
.../streams-provider-datasift/pom.xml | 242 ------
.../streams/datasift/csdl/DatasiftCsdlUtil.java | 132 ---
.../DatasiftActivitySerializerProcessor.java | 97 ---
.../DatasiftTypeConverterProcessor.java | 167 ----
.../datasift/provider/DatasiftConverter.java | 38 -
.../provider/DatasiftManagedSourceSetup.java | 94 ---
.../datasift/provider/DatasiftPushProvider.java | 270 ------
.../provider/DatasiftStreamConfigurator.java | 52 --
.../provider/DatasiftStreamProvider.java | 242 ------
.../streams/datasift/provider/ErrorHandler.java | 47 --
.../streams/datasift/provider/Subscription.java | 60 --
.../serializer/DatasiftActivitySerializer.java | 65 --
.../serializer/DatasiftEventClassifier.java | 53 --
.../DatasiftInstagramActivitySerializer.java | 125 ---
.../DatasiftInteractionActivitySerializer.java | 247 ------
.../DatasiftTwitterActivitySerializer.java | 271 ------
.../datasift/util/StreamsDatasiftMapper.java | 89 --
.../org/apache/streams/datasift/Datasift.json | 462 -----------
.../streams/datasift/DatasiftConfiguration.json | 136 ----
.../datasift/DatasiftPushConfiguration.json | 20 -
.../datasift/DatasiftStreamConfiguration.json | 20 -
.../streams/datasift/DatasiftWebhookData.json | 35 -
.../datasift/facebook/DatasiftFacebook.json | 125 ---
.../datasift/instagram/DatasiftInstagram.json | 183 -----
.../interaction/DatasiftInteraction.json | 97 ---
.../datasift/twitter/DatasiftTwitter.json | 370 ---------
.../datasift/twitter/DatasiftTwitterMedia.json | 132 ---
.../datasift/twitter/DatasiftTwitterUser.json | 73 --
.../src/main/resources/datasift.conf | 25 -
.../src/site/markdown/index.md | 27 -
.../com/datasift/test/DatasiftSerDeTest.java | 75 --
.../provider/DatasiftStreamProviderTest.java | 144 ----
.../datasift/provider/ErrorHandlerTest.java | 40 -
.../datasift/provider/SubscriptionTest.java | 58 --
.../DatasiftActivitySerializerIT.java | 110 ---
.../serializer/DatasiftEventClassifierTest.java | 71 --
.../DatasiftInstagramActivitySerializerIT.java | 57 --
...DatasiftInteractionActivitySerializerIT.java | 62 --
.../DatasiftTwitterActivitySerializerIT.java | 57 --
.../gnip-edc-facebook/pom.xml | 152 ----
.../test/FacebookEDCAsActivityTest.java | 93 ---
.../facebook/test/FacebookEDCSerDeTest.java | 74 --
.../gnip-edc-flickr/pom.xml | 139 ----
.../flickr/test/FlickrEDCAsActivityTest.java | 92 ---
.../gnip/flickr/test/FlickrEDCSerDeTest.java | 76 --
.../gnip-edc-googleplus/pom.xml | 102 ---
.../com/gplus/api/GPlusActivitySerializer.java | 93 ---
.../com/gplus/api/GPlusEDCAsActivityTest.java | 95 ---
.../gnip-edc-instagram/pom.xml | 116 ---
.../jsonschema/com/instagram/Instagram.json | 208 -----
.../com/instagram/test/InstagramSerDeTest.java | 70 --
.../gnip-edc-reddit/pom.xml | 103 ---
.../reddit/api/RedditActivitySerializer.java | 107 ---
.../reddit/api/RedditEDCAsActivityJSONTest.java | 97 ---
.../gnip-edc-youtube/pom.xml | 139 ----
.../java/com/gnip/test/YouTubeEDCSerDeTest.java | 79 --
.../com/gnip/test/YoutubeEDCAsActivityTest.java | 87 --
.../gnip-powertrack/README.md | 8 -
.../gnip-powertrack/pom.xml | 207 -----
.../ActivityXMLActivitySerializer.java | 240 ------
.../gnip/powertrack/GnipActivityFixer.java | 151 ----
.../PowerTrackActivitySerializer.java | 121 ---
.../src/main/jsonschema/com/gnip/Gnip.json | 815 -------------------
.../src/main/xmlschema/com/gnip/binding.xjb | 33 -
.../src/main/xmlschema/com/gnip/entry.xsd | 398 ---------
.../gnip-powertrack/src/site/markdown/index.md | 15 -
.../test/PowerTrackDeserializationTest.java | 55 --
streams-contrib/streams-provider-gnip/pom.xml | 67 --
79 files changed, 9503 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index 8db46c1..b50f440 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -37,7 +37,6 @@
</properties>
<modules>
- <!--<module>streams-persist-cassandra</module>-->
<module>streams-persist-console</module>
<module>streams-persist-elasticsearch</module>
<module>streams-persist-filebuffer</module>
@@ -47,16 +46,12 @@
<module>streams-persist-kafka</module>
<module>streams-persist-mongo</module>
<module>streams-amazon-aws</module>
- <!--<module>streams-processor-lucene</module>-->
- <!--<module>streams-processor-tika</module>-->
<module>streams-processor-jackson</module>
<module>streams-processor-json</module>
<module>streams-processor-urls</module>
<module>streams-processor-peoplepattern</module>
- <module>streams-provider-datasift</module>
<module>streams-provider-facebook</module>
<module>streams-provider-google</module>
- <module>streams-provider-gnip</module>
<module>streams-provider-instagram</module>
<module>streams-provider-moreover</module>
<module>streams-provider-twitter</module>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/pom.xml b/streams-contrib/streams-persist-cassandra/pom.xml
deleted file mode 100644
index 578004c..0000000
--- a/streams-contrib/streams-persist-cassandra/pom.xml
+++ /dev/null
@@ -1,168 +0,0 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-contrib</artifactId>
- <version>0.4-incubating-SNAPSHOT</version>
- </parent>
-
- <artifactId>streams-persist-cassandra</artifactId>
- <name>${project.artifactId}</name>
-
- <description>Cassandra Module</description>
-
- <properties>
- <bundle.symbolicName>streams-persist-cassandra</bundle.symbolicName>
- <bundle.namespace>org.apache.streams</bundle.namespace>
- <easymock.version>3.2</easymock.version>
- </properties>
-
- <packaging>bundle</packaging>
- <build>
- <resources>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
-
- <resource>
- <directory>.</directory>
- <includes>
- <include>plugin.xml</include>
- <include>plugin.properties</include>
- <include>icons/**</include>
- </includes>
- </resource>
- </resources>
- <plugins>
- <plugin>
- <groupId>org.ops4j</groupId>
- <artifactId>maven-pax-plugin</artifactId>
- <!--
- | enable improved OSGi compilation support for the bundle life-cycle.
- | to switch back to the standard bundle life-cycle, move this setting
- | down to the maven-bundle-plugin section
- -->
- <extensions>true</extensions>
- </plugin>
- <plugin>
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-bundle-plugin</artifactId>
- <version>1.4.3</version>
- <!--
- | the following instructions build a simple set of public/private classes into an OSGi bundle
- -->
- <configuration>
- <instructions>
- <Bundle-SymbolicName>${bundle.symbolicName}</Bundle-SymbolicName>
- <Bundle-Version>${project.version}</Bundle-Version>
- <Export-Package>
- ${bundle.namespace};version="${project.version}",org.apache.streams.cassandra.repository.impl, org.apache.streams.cassandra.model, org.apache.streams.cassandra.configuration
- </Export-Package>
- <Private-Package>${bundle.namespace}.cassandra.repository.impl.*,${bundle.namespace}.cassandra.model, ${bundle.namespace}.cassandra.configuration </Private-Package>
- <Import-Package>
- org.apache.rave.model,org.apache.rave.portal.model.impl,
- com.datastax.driver.core, com.datastax.driver.core.exceptions, org.codehaus.jackson.map.annotate,
- javax.persistence, org.apache.commons.logging, com.google.common.collect, org.codehaus.jackson.map,
- org.apache.commons.lang,
- org.apache.streams.osgi.components.activitysubscriber,
- org.springframework.beans.factory.annotation, org.springframework.stereotype
- </Import-Package>
- </instructions>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.rave</groupId>
- <artifactId>rave-core-api</artifactId>
- <version>${rave.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.rave</groupId>
- <artifactId>rave-core</artifactId>
- <version>${rave.version}</version>
- <exclusions>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.8.2</version>
- </dependency>
-
- <dependency>
- <groupId>com.datastax.cassandra</groupId>
- <artifactId>cassandra-driver-core</artifactId>
- <version>${datastax.version}</version>
- <exclusions>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- <version>3.2.9.Final</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streams.osgi.components</groupId>
- <artifactId>activity-subscriber</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.easymock</groupId>
- <artifactId>easymock</artifactId>
- <version>${easymock.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-testing</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- </dependencies>
-
-</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java
deleted file mode 100644
index 195467d..0000000
--- a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/configuration/CassandraConfiguration.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.cassandra.configuration;
-
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-@Component
-public class CassandraConfiguration {
- @Value("${keyspaceName}")
- private String keyspaceName;
-
- @Value("${activitystreamsColumnFamilyName}")
- private String activitystreamsColumnFamilyName;
-
- @Value("${subscriptionColumnFamilyName}")
- private String subscriptionColumnFamilyName;
-
- @Value("${publisherColumnFamilyName}")
- private String publisherColumnFamilyName;
-
- @Value("${cassandraPort}")
- private String cassandraPort;
-
- public String getKeyspaceName() {
- return keyspaceName;
- }
-
- public void setKeyspaceName(String keyspaceName) {
- this.keyspaceName = keyspaceName;
- }
-
- public String getActivitystreamsColumnFamilyName() {
- return activitystreamsColumnFamilyName;
- }
-
- public void setActivitystreamsColumnFamilyName(String activitystreamsColumnFamilyName) {
- this.activitystreamsColumnFamilyName = activitystreamsColumnFamilyName;
- }
-
- public String getSubscriptionColumnFamilyName() {
- return subscriptionColumnFamilyName;
- }
-
- public void setSubscriptionColumnFamilyName(String subscriptionColumnFamilyName) {
- this.subscriptionColumnFamilyName = subscriptionColumnFamilyName;
- }
-
- public String getPublisherColumnFamilyName() {
- return publisherColumnFamilyName;
- }
-
- public void setPublisherColumnFamilyName(String publisherColumnFamilyName) {
- this.publisherColumnFamilyName = publisherColumnFamilyName;
- }
-
- public String getCassandraPort() {
- return cassandraPort;
- }
-
- public void setCassandraPort(String cassandraPort) {
- this.cassandraPort = cassandraPort;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java
deleted file mode 100644
index 88db8aa..0000000
--- a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/model/CassandraActivityStreamsEntry.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.cassandra.model;
-
-import org.apache.rave.model.ActivityStreamsObject;
-import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl;
-import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl;
-import org.codehaus.jackson.map.annotate.JsonDeserialize;
-
-import java.util.Date;
-
-public class CassandraActivityStreamsEntry extends ActivityStreamsEntryImpl implements Comparable<CassandraActivityStreamsEntry>{
-
- @JsonDeserialize(as=ActivityStreamsObjectImpl.class)
- private ActivityStreamsObject object;
-
- @JsonDeserialize(as=ActivityStreamsObjectImpl.class)
- private ActivityStreamsObject target;
-
- @JsonDeserialize(as=ActivityStreamsObjectImpl.class)
- private ActivityStreamsObject actor;
-
- @JsonDeserialize(as=ActivityStreamsObjectImpl.class)
- private ActivityStreamsObject provider;
-
- public int compareTo(CassandraActivityStreamsEntry entry){
- return (this.getPublished()).compareTo(entry.getPublished());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
deleted file mode 100644
index 56e5416..0000000
--- a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepository.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.cassandra.repository.impl;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.exceptions.AlreadyExistsException;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.rave.model.ActivityStreamsEntry;
-import org.apache.rave.model.ActivityStreamsObject;
-import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl;
-import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl;
-import org.apache.streams.cassandra.configuration.CassandraConfiguration;
-import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-
-public class CassandraActivityStreamsRepository {
-
- private static final Log LOG = LogFactory.getLog(CassandraActivityStreamsRepository.class);
-
- private CassandraKeyspace keyspace;
- private CassandraConfiguration configuration;
-
- @Autowired
- public CassandraActivityStreamsRepository(CassandraKeyspace keyspace, CassandraConfiguration configuration) {
- this.configuration = configuration;
- this.keyspace = keyspace;
-
- try {
- keyspace.getSession().execute("CREATE TABLE " + configuration.getActivitystreamsColumnFamilyName() + " (" +
- "id text, " +
- "published timestamp, " +
- "verb text, " +
- "tags text, " +
-
- "actor_displayname text, " +
- "actor_id text, " +
- "actor_url text, " +
- "actor_objecttype text, " +
-
- "target_displayname text, " +
- "target_id text, " +
- "target_url text, " +
-
- "provider_url text, " +
-
- "object_url text, " +
- "object_displayname text, " +
- "object_id text, " +
- "object_objecttype text, " +
-
- "PRIMARY KEY (id, tags, published));");
- } catch (AlreadyExistsException ignored) {
- }
- }
-
- public void save(ActivityStreamsEntry entry) {
- String sql = "INSERT INTO " + configuration.getActivitystreamsColumnFamilyName() + " (" +
- "id, published, verb, tags, " +
- "actor_displayname, actor_objecttype, actor_id, actor_url, " +
- "target_displayname, target_id, target_url, " +
- "provider_url, " +
- "object_displayname, object_objecttype, object_id, object_url) " +
- "VALUES ('" +
- entry.getId() + "','" +
- entry.getPublished().getTime() + "','" +
- entry.getVerb() + "','" +
- entry.getTags() + "','" +
-
- entry.getActor().getDisplayName() + "','" +
- entry.getActor().getObjectType() + "','" +
- entry.getActor().getId() + "','" +
- entry.getActor().getUrl() + "','" +
-
- entry.getTarget().getDisplayName() + "','" +
- entry.getTarget().getId() + "','" +
- entry.getTarget().getUrl() + "','" +
-
- entry.getProvider().getUrl() + "','" +
-
- entry.getObject().getDisplayName() + "','" +
- entry.getObject().getObjectType() + "','" +
- entry.getObject().getId() + "','" +
- entry.getObject().getUrl() +
-
- "')";
- keyspace.getSession().execute(sql);
- }
-
- public List<CassandraActivityStreamsEntry> getActivitiesForFilters(List<String> filters, Date lastUpdated) {
- List<CassandraActivityStreamsEntry> results = new ArrayList<CassandraActivityStreamsEntry>();
-
- for (String tag : filters) {
- String cql = "SELECT * FROM " + configuration.getActivitystreamsColumnFamilyName() + " WHERE ";
-
- //add filters
- cql = cql + " tags = '" + tag + "' AND ";
-
- //specify last modified
- cql = cql + "published > " + lastUpdated.getTime() + " ALLOW FILTERING";
-
- //execute the cql query and store the results
- ResultSet set = keyspace.getSession().execute(cql);
-
- //iterate through the results and create a new ActivityStreamsEntry for every result returned
-
- for (Row row : set) {
- CassandraActivityStreamsEntry entry = new CassandraActivityStreamsEntry();
- ActivityStreamsObject actor = new ActivityStreamsObjectImpl();
- ActivityStreamsObject target = new ActivityStreamsObjectImpl();
- ActivityStreamsObject object = new ActivityStreamsObjectImpl();
- ActivityStreamsObject provider = new ActivityStreamsObjectImpl();
-
- actor.setDisplayName(row.getString("actor_displayname"));
- actor.setId(row.getString("actor_id"));
- actor.setObjectType(row.getString("actor_objecttype"));
- actor.setUrl(row.getString("actor_url"));
-
- target.setDisplayName(row.getString("target_displayname"));
- target.setId(row.getString("target_id"));
- target.setUrl(row.getString("target_url"));
-
- object.setDisplayName(row.getString("object_displayname"));
- object.setObjectType(row.getString("object_objecttype"));
- object.setUrl(row.getString("object_url"));
- object.setId(row.getString("object_id"));
-
- provider.setUrl(row.getString("provider_url"));
-
- entry.setPublished(row.getDate("published"));
- entry.setVerb(row.getString("verb"));
- entry.setId(row.getString("id"));
- entry.setTags(row.getString("tags"));
- entry.setActor(actor);
- entry.setTarget(target);
- entry.setObject(object);
- entry.setProvider(provider);
-
- results.add(entry);
- }
- }
-
- return results;
- }
-
- public void dropTable(String table) {
- String cql = "DROP TABLE " + table;
- keyspace.getSession().execute(cql);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java
deleted file mode 100644
index 9106a51..0000000
--- a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraKeyspace.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.cassandra.repository.impl;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.exceptions.AlreadyExistsException;
-import org.apache.streams.cassandra.configuration.CassandraConfiguration;
-import org.springframework.beans.factory.annotation.Autowired;
-
-public class CassandraKeyspace {
- private CassandraConfiguration configuration;
- private Cluster cluster;
- private Session session;
-
- @Autowired
- public CassandraKeyspace(CassandraConfiguration configuration){
- this.configuration = configuration;
-
- cluster = Cluster.builder().addContactPoint(configuration.getCassandraPort()).build();
- session = cluster.connect();
-
- //TODO: cassandra 2 will have support for CREATE KEYSPACE IF NOT EXISTS
- try {
- session.execute("CREATE KEYSPACE " + configuration.getKeyspaceName() + " WITH replication = { 'class': 'SimpleStrategy','replication_factor' : 1 };");
- } catch (AlreadyExistsException ignored) {
- }
-
- //connect to the keyspace
- session = cluster.connect(configuration.getKeyspaceName());
- }
-
- public Session getSession(){
- return session;
- }
-
- @Override
- protected void finalize() throws Throwable {
- try {
- cluster.shutdown();
- } finally {
- super.finalize();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java b/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java
deleted file mode 100644
index f5fe471..0000000
--- a/streams-contrib/streams-persist-cassandra/src/main/java/org/apache/streams/cassandra/repository/impl/CassandraSubscriptionRepository.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.cassandra.repository.impl;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.exceptions.AlreadyExistsException;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.streams.cassandra.configuration.CassandraConfiguration;
-import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription;
-import org.springframework.beans.factory.annotation.Autowired;
-
-public class CassandraSubscriptionRepository {
- private static final Log LOG = LogFactory.getLog(CassandraSubscriptionRepository.class);
-
- private CassandraKeyspace keyspace;
- private CassandraConfiguration configuration;
-
- @Autowired
- public CassandraSubscriptionRepository(CassandraKeyspace keyspace, CassandraConfiguration configuration) {
- this.keyspace = keyspace;
- this.configuration = configuration;
-
- try {
- keyspace.getSession().execute("CREATE TABLE " + configuration.getSubscriptionColumnFamilyName() + " (" +
- "id text, " +
- "filters text, " +
-
- "PRIMARY KEY (id));");
- } catch (AlreadyExistsException ignored) {
- }
- }
-
- public String getFilters(String id){
- String cql = "SELECT * FROM " + configuration.getSubscriptionColumnFamilyName() + " WHERE id = '" + id+"';";
-
- ResultSet set = keyspace.getSession().execute(cql);
-
- return set.one().getString("filters");
- }
-
- public void save(ActivityStreamsSubscription subscription){
- String cql = "INSERT INTO " + configuration.getSubscriptionColumnFamilyName() + " (" +
- "id, filters) " +
- "VALUES ('" +
- subscription.getAuthToken() + "','" +
- StringUtils.join(subscription.getFilters(), " ") +
-
- "')";
- keyspace.getSession().execute(cql);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml b/streams-contrib/streams-persist-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml
deleted file mode 100644
index 842c918..0000000
--- a/streams-contrib/streams-persist-cassandra/src/main/resources/META_INF/spring/streams-cassandra-context.xml
+++ /dev/null
@@ -1,25 +0,0 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-<beans
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns="http://www.springframework.org/schema/beans"
- xmlns:context="http://www.springframework.org/schema/context"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
- http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
-
-</beans>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
deleted file mode 100644
index 978af10..0000000
--- a/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivityStreamsRepositoryTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.cassandra.repository.impl;
-
-import com.datastax.driver.core.ResultSet;
-import org.apache.rave.model.ActivityStreamsEntry;
-import org.apache.rave.model.ActivityStreamsObject;
-import org.apache.rave.portal.model.impl.ActivityStreamsEntryImpl;
-import org.apache.rave.portal.model.impl.ActivityStreamsObjectImpl;
-import org.apache.streams.cassandra.configuration.CassandraConfiguration;
-import org.apache.streams.cassandra.model.CassandraActivityStreamsEntry;
-import static org.easymock.EasyMock.*;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-
-public class CassandraActivityStreamsRepositoryTest {
-
- private CassandraActivityStreamsRepository repository;
-
-
- @Before
- public void setup() {
- CassandraKeyspace keyspace = createMock(CassandraKeyspace.class);
- CassandraConfiguration configuration = createMock(CassandraConfiguration.class);
- repository = new CassandraActivityStreamsRepository(keyspace, configuration);
- }
-
- @Ignore
- @Test
- public void saveActivity() {
- ActivityStreamsEntry entry = new ActivityStreamsEntryImpl();
- ActivityStreamsObject actor = new ActivityStreamsObjectImpl();
- ActivityStreamsObject target = new ActivityStreamsObjectImpl();
- ActivityStreamsObject object = new ActivityStreamsObjectImpl();
- ActivityStreamsObject provider = new ActivityStreamsObjectImpl();
-
- actor.setId("actorid1");
- actor.setUrl("actorurl1");
- actor.setDisplayName("actorname1");
-
- target.setId("targetid1");
- target.setUrl("targeturl1");
- target.setDisplayName("r501");
-
- provider.setUrl("providerurl");
-
- object.setId("objectid1");
- object.setDisplayName("objectname1");
-
- entry.setId("dink");
- entry.setVerb("verb1");
- entry.setTags("r501");
- entry.setProvider(provider);
- Date d = new Date();
- entry.setPublished(d);
- entry.setActor(actor);
- entry.setObject(object);
- entry.setTarget(target);
-
- repository.save(entry);
- }
-
- @Ignore
- @Test
- public void getActivity() {
- String cql = "tags";
- String other = "r501";
- List<String> f = Arrays.asList(cql, other);
- Date d = new Date(0);
- List<CassandraActivityStreamsEntry> results = repository.getActivitiesForFilters(f,d);
- }
-
- @Ignore
- @Test
- public void dropTableTest(){
- repository.dropTable("coltest");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java b/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java
deleted file mode 100644
index 2a90462..0000000
--- a/streams-contrib/streams-persist-cassandra/src/test/java/org/apache/streams/cassandra/repository/impl/CassandraActivitySubscriptionTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.cassandra.repository.impl;
-
-import org.apache.streams.osgi.components.activitysubscriber.ActivityStreamsSubscription;
-import org.apache.streams.osgi.components.activitysubscriber.impl.ActivityStreamsSubscriptionImpl;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.Arrays;
-
-public class CassandraActivitySubscriptionTest {
-
- public CassandraSubscriptionRepository repository;
-
-
- @Before
- public void setup() {
-// repository = new CassandraSubscriptionRepository();
- }
-
- @Ignore
- @Test
- public void saveTest(){
- ActivityStreamsSubscription subscription = new ActivityStreamsSubscriptionImpl();
- subscription.setFilters(Arrays.asList("thisis", "atest"));
- subscription.setAuthToken("subid");
-
- repository.save(subscription);
- }
-
- @Ignore
- @Test
- public void getTest(){
- String filters = repository.getFilters("subid");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/README.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/README.md b/streams-contrib/streams-provider-datasift/README.md
deleted file mode 100644
index a29fb16..0000000
--- a/streams-contrib/streams-provider-datasift/README.md
+++ /dev/null
@@ -1,18 +0,0 @@
-streams-provider-datasift
-=====================
-
-Datasift Provider
-
-Example configuration:
-
- datasift {
- apiKey = ""
- userName = ""
- hashes = [
- "b8aaf7cec5faa2fadbd55d651933a31e",
- "f41f054e2a2ba8d2e7b0d74f56e727d6"
- ]
- }
-
-
-
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/pom.xml b/streams-contrib/streams-provider-datasift/pom.xml
deleted file mode 100644
index def7826..0000000
--- a/streams-contrib/streams-provider-datasift/pom.xml
+++ /dev/null
@@ -1,242 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-contrib</artifactId>
- <version>0.4-incubating-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>streams-provider-datasift</artifactId>
- <name>${project.artifactId}</name>
-
- <description>Datasift Provider</description>
-
- <properties>
- <skipITs>true</skipITs>
- <testDataBaseURl>http://streams.peoplepattern.com.s3.amazonaws.com/test-data/</testDataBaseURl>
- </properties>
-
-
- <dependencies>
- <dependency>
- <groupId>com.typesafe</groupId>
- <artifactId>config</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-config</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-pojo</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-processor-jackson</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-provider-twitter</artifactId>
- <version>${project.version}</version>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-provider-instagram</artifactId>
- <version>${project.version}</version>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>com.datasift.client</groupId>
- <artifactId>datasift-java</artifactId>
- <version>3.2.6</version>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>log4j</artifactId>
- <groupId>log4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>commons-logging</artifactId>
- <groupId>commons-logging</groupId>
- </exclusion>
- <exclusion>
- <groupId>com.boundary</groupId>
- <artifactId>high-scale-lib</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>com.boundary</groupId>
- <artifactId>high-scale-lib</artifactId>
- <version>1.0.6</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>jcl-over-slf4j</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jsonschema2pojo</groupId>
- <artifactId>jsonschema2pojo-core</artifactId>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <version>1.9.5</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-util</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-testing</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- </dependencies>
-
- <build>
- <sourceDirectory>src/main/java</sourceDirectory>
- <testSourceDirectory>src/test/java17</testSourceDirectory>
- <resources>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- </resources>
- <testResources>
- <testResource>
- <directory>src/test/resources</directory>
- </testResource>
- </testResources>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <executions>
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>target/generated-sources/jsonschema2pojo</source>
- </sources>
- </configuration>
- </execution>
- <execution>
- <id>add-test-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-test-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/test/java17</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.jsonschema2pojo</groupId>
- <artifactId>jsonschema2pojo-maven-plugin</artifactId>
- <configuration>
- <addCompileSourceRoot>true</addCompileSourceRoot>
- <generateBuilders>true</generateBuilders>
- <sourceDirectory>${project.basedir}/src/main/jsonschema</sourceDirectory>
- <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
- <targetPackage>org.apache.streams.datasift</targetPackage>
- <useLongIntegers>true</useLongIntegers>
- <useJodaDates>true</useJodaDates>
- </configuration>
- <executions>
- <execution>
- <goals>
- <goal>generate</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>com.googlecode.maven-download-plugin</groupId>
- <artifactId>download-maven-plugin</artifactId>
- <version>1.2.1</version>
- <executions>
- <execution>
- <id>download-it-data</id>
- <phase>pre-integration-test</phase>
- <goals>
- <goal>wget</goal>
- </goals>
- <configuration>
- <url>${testDataBaseURl}/${project.artifactId}.zip</url>
- <unpack>true</unpack>
- <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
- <!--<md5>df65b5642f33676313ebe4d5b69a3fff</md5>-->
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-failsafe-plugin</artifactId>
- <configuration>
- <skipTests>${skipITs}</skipTests>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-resources-plugin</artifactId>
- </plugin>
-
- </plugins>
-
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/csdl/DatasiftCsdlUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/csdl/DatasiftCsdlUtil.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/csdl/DatasiftCsdlUtil.java
deleted file mode 100644
index 049ed8c..0000000
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/csdl/DatasiftCsdlUtil.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.datasift.csdl;
-
-import com.google.common.collect.Lists;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.ListIterator;
-
-public class DatasiftCsdlUtil {
-
- private static final Logger log = LoggerFactory
- .getLogger(DatasiftCsdlUtil.class);
-
- public static String csdlFromTwitterUserIds(List<String> list) throws Exception {
-
- StringBuilder csdlBuilder = new StringBuilder();
-
- csdlBuilder.append("twitter.user.id in [");
- ListIterator<String> listIterator = Lists.newArrayList(list).listIterator();
- while( listIterator.hasNext() ) {
- csdlBuilder.append(listIterator.next());
- if (listIterator.hasNext())
- csdlBuilder.append(",");
- }
- csdlBuilder.append("]\n");
- csdlBuilder.append(" OR\n");
- csdlBuilder.append("twitter.in_reply_to_user_id contains_any \"");
- listIterator = Lists.newArrayList(list).listIterator();
- while( listIterator.hasNext() ) {
- csdlBuilder.append(listIterator.next());
- if (listIterator.hasNext())
- csdlBuilder.append(",");
- }
- csdlBuilder.append("\"\n");
- csdlBuilder.append(" OR\n");
- csdlBuilder.append("twitter.mention_ids in [");
- listIterator = Lists.newArrayList(list).listIterator();
- while( listIterator.hasNext() ) {
- csdlBuilder.append(listIterator.next());
- if (listIterator.hasNext())
- csdlBuilder.append(",");
- }
- csdlBuilder.append("]\n");
-
- log.debug(csdlBuilder.toString());
-
- return csdlBuilder.toString();
- }
-
- public static String csdlFromTwitterUserNames(List<String> list) throws Exception {
-
- StringBuilder csdlBuilder = new StringBuilder();
-
- csdlBuilder.append("twitter.user.screen_name contains_any \"");
- ListIterator<String> listIterator = Lists.newArrayList(list).listIterator();
- while( listIterator.hasNext() ) {
- csdlBuilder.append(listIterator.next());
- if (listIterator.hasNext())
- csdlBuilder.append(",");
- }
- csdlBuilder.append("\"\n");
- csdlBuilder.append(" OR\n");
- csdlBuilder.append("twitter.in_reply_to_screen_name contains_any \"");
- listIterator = Lists.newArrayList(list).listIterator();
- while( listIterator.hasNext() ) {
- csdlBuilder.append(listIterator.next());
- if (listIterator.hasNext())
- csdlBuilder.append(",");
- }
- csdlBuilder.append("\"\n");
- csdlBuilder.append(" OR\n");
- csdlBuilder.append("twitter.mentions contains_any \"");
- listIterator = Lists.newArrayList(list).listIterator();
- while( listIterator.hasNext() ) {
- csdlBuilder.append(listIterator.next());
- if (listIterator.hasNext())
- csdlBuilder.append(",");
- }
- csdlBuilder.append("\"\n");
-
- log.debug(csdlBuilder.toString());
-
- return csdlBuilder.toString();
- }
-
- public static String csdlFromKeywords(List<String> include, List<String> exclude) throws Exception {
-
- StringBuilder csdlBuilder = new StringBuilder();
-
- csdlBuilder.append("interaction.content contains_any \"");
- ListIterator<String> listIterator = Lists.newArrayList(include).listIterator();
- while( listIterator.hasNext() ) {
- csdlBuilder.append(listIterator.next());
- if (listIterator.hasNext())
- csdlBuilder.append(",");
- }
- csdlBuilder.append("\"\n");
- csdlBuilder.append(" AND NOT ( \n");
- csdlBuilder.append("interaction.content \"");
- listIterator = Lists.newArrayList(exclude).listIterator();
- while( listIterator.hasNext() ) {
- csdlBuilder.append(listIterator.next());
- if (listIterator.hasNext())
- csdlBuilder.append(",");
- }
- csdlBuilder.append("\"\n");
- csdlBuilder.append(")\n");
-
- log.debug(csdlBuilder.toString());
-
- return csdlBuilder.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftActivitySerializerProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftActivitySerializerProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftActivitySerializerProcessor.java
deleted file mode 100644
index 43b16b2..0000000
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftActivitySerializerProcessor.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.streams.datasift.processor;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.provider.DatasiftConverter;
-import org.apache.streams.datasift.serializer.DatasiftActivitySerializer;
-import org.apache.streams.datasift.util.StreamsDatasiftMapper;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- *
- */
-public class DatasiftActivitySerializerProcessor implements StreamsProcessor {
-
- private final static String STREAMS_ID = "DatasiftActivitySerializerProcessor";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftActivitySerializerProcessor.class);
-
- private ObjectMapper mapper;
- private Class outClass;
- private DatasiftActivitySerializer datasiftActivitySerializer;
-
- public final static String TERMINATE = new String("TERMINATE");
-
- public DatasiftActivitySerializerProcessor(Class outClass) {
- this.outClass = outClass;
- }
-
- @Override
- public String getId() {
- return STREAMS_ID;
- }
-
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
- List<StreamsDatum> result = Lists.newLinkedList();
- Activity activity;
- try {
- Datasift node;
- if( entry.getDocument() instanceof String ) {
- node = this.mapper.readValue((String)entry.getDocument(), Datasift.class);
- } else if( entry.getDocument() instanceof Datasift ) {
- node = (Datasift) entry.getDocument();
- } else {
- node = this.mapper.convertValue(entry.getDocument(), Datasift.class);
- }
- if(node != null) {
- activity = this.datasiftActivitySerializer.deserialize(node);
- StreamsDatum datum = new StreamsDatum(activity, entry.getId(), entry.getTimestamp(), entry.getSequenceid());
- datum.setMetadata(entry.getMetadata());
- result.add(datum);
- }
- } catch (Exception e) {
- LOGGER.error("Exception converting Datasift Interaction to "+this.outClass.getName()+ " : {}", e);
- }
- return result;
- }
-
- @Override
- public void prepare(Object configurationObject) {
- this.mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsDatasiftMapper.DATASIFT_FORMAT));
- this.datasiftActivitySerializer = new DatasiftActivitySerializer();
- }
-
- @Override
- public void cleanUp() {
-
- }
-
-};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
deleted file mode 100644
index dcffd1a..0000000
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.streams.datasift.processor;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.provider.DatasiftConverter;
-import org.apache.streams.datasift.serializer.DatasiftActivitySerializer;
-import org.apache.streams.datasift.util.StreamsDatasiftMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.jackson.CleanAdditionalPropertiesProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- *
- */
-public class DatasiftTypeConverterProcessor implements StreamsProcessor {
-
- private final static String STREAMS_ID = "RegexUrlExtractor";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftTypeConverterProcessor.class);
-
- private ObjectMapper mapper;
- private Class outClass;
- private DatasiftActivitySerializer datasiftInteractionActivitySerializer;
- private DatasiftConverter converter;
-
- public final static String TERMINATE = new String("TERMINATE");
-
- public DatasiftTypeConverterProcessor(Class outClass) {
- this.outClass = outClass;
- }
-
- @Override
- public String getId() {
- return STREAMS_ID;
- }
-
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
- List<StreamsDatum> result = Lists.newLinkedList();
- Object doc;
- try {
- if( entry.getDocument() instanceof String ) {
- ObjectNode node = this.mapper.readValue((String)entry.getDocument(), ObjectNode.class);
- doc = this.converter.convert(node, this.mapper);
- } else {
- doc = this.converter.convert(entry.getDocument(), this.mapper);
- }
- if(doc != null) {
- result.add(new StreamsDatum(doc, entry.getId()));
- }
- } catch (Exception e) {
- LOGGER.error("Exception converting Datasift Interaction to "+this.outClass.getName()+ " : {}", e);
- }
- return result;
- }
-
- @Override
- public void prepare(Object configurationObject) {
- this.mapper = StreamsDatasiftMapper.getInstance();
- this.datasiftInteractionActivitySerializer = new DatasiftActivitySerializer();
- if(this.outClass.equals(Activity.class)) {
- this.converter = new ActivityConverter();
- } else if (this.outClass.equals(String.class)) {
- this.converter = new StringConverter();
- } else {
- LOGGER.warn("Using defaulting datasift converter");
- this.converter = new DefaultConverter(this.outClass);
- }
- }
-
- @Override
- public void cleanUp() {
-
- }
-
- private class ActivityConverter implements DatasiftConverter {
-
- @Override
- public Object convert(Object toConvert, ObjectMapper mapper) {
- if(toConvert instanceof Activity)
- return toConvert;
- try {
- if(toConvert instanceof String)
- return datasiftInteractionActivitySerializer.deserialize((String) toConvert);
- return mapper.convertValue(toConvert, Activity.class);
- } catch (Exception e) {
- LOGGER.error("Exception while trying to convert {} to a Activity.", toConvert.getClass());
- LOGGER.error("Exception : {}", e);
- e.printStackTrace();
- return null;
- }
- }
-
-
- }
-
- private class StringConverter implements DatasiftConverter {
- @Override
- public Object convert(Object toConvert, ObjectMapper mapper) {
- try {
- if(toConvert instanceof String){
- return mapper.writeValueAsString(mapper.readValue((String) toConvert, Datasift.class));
- } else {
- if(toConvert.getClass().equals(Activity.class)) { //hack to remove additional properties
- ObjectNode node = mapper.convertValue(toConvert, ObjectNode.class);
- CleanAdditionalPropertiesProcessor.cleanAdditionalProperties(node);
- return mapper.writeValueAsString(node);
- } else
- return mapper.writeValueAsString(toConvert);
- }
- } catch (Exception e) {
- LOGGER.error("Exception while trying to write {} as a String.", toConvert.getClass());
- LOGGER.error("Exception : {}", e);
- return null;
- }
- }
- }
-
- private class DefaultConverter implements DatasiftConverter {
-
- private Class clazz;
-
- public DefaultConverter(Class clazz) {
- this.clazz = clazz;
- }
-
- @Override
- public Object convert(Object toConvert, ObjectMapper mapper) {
- try {
- if(toConvert instanceof String) {
- return mapper.readValue((String) toConvert, this.clazz);
- } else {
- return mapper.convertValue(toConvert, this.clazz);
- }
-
- } catch (Exception e) {
- throw new RuntimeException("Failed converting +"+ toConvert.getClass().getName()+" to "+ this.clazz.getName());
- }
- }
- }
-};
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftConverter.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftConverter.java
deleted file mode 100644
index f978205..0000000
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftConverter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.streams.datasift.provider;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-/**
- * Converts a {@link org.apache.streams.datasift.Datasift} object to a StreamsDatum
- */
-public interface DatasiftConverter {
-
- /**
- * Converts a datasift related object to the desired resulting object.
- * @param toConvert
- * @param mapper
- * @return
- */
- public Object convert(Object toConvert, ObjectMapper mapper);
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftManagedSourceSetup.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftManagedSourceSetup.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftManagedSourceSetup.java
deleted file mode 100644
index 8200ce2..0000000
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftManagedSourceSetup.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.datasift.provider;
-
-import com.datasift.client.DataSiftClient;
-import com.datasift.client.managedsource.ManagedSource;
-import com.datasift.client.managedsource.ManagedSourceList;
-import com.datasift.client.managedsource.sources.DataSource;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Maps;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.datasift.DatasiftConfiguration;
-import org.apache.streams.datasift.managed.StreamsManagedSource;
-import org.apache.streams.datasift.util.StreamsDatasiftMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created by sblackmon on 8/8/14.
- */
-public class DatasiftManagedSourceSetup implements Runnable {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftStreamProvider.class);
-
- private static DatasiftConfiguration config = DatasiftStreamConfigurator.detectConfiguration(StreamsConfigurator.config);
-
- private static final ObjectMapper MAPPER = StreamsDatasiftMapper.getInstance();
-
- DataSiftClient client;
- Map<String, ManagedSource> currentManagedSourceMap = Maps.newHashMap();
- List<StreamsManagedSource> updatedManagedSourceList;
-
- public static void main(String[] args) {
- DatasiftManagedSourceSetup job = new DatasiftManagedSourceSetup();
- (new Thread(job)).start();
- }
-
- @Override
- public void run() {
-
- setup();
-
- current();
-
- updatedManagedSourceList = config.getManagedSources();
-
- for( StreamsManagedSource source : updatedManagedSourceList ) {
- ManagedSource current = currentManagedSourceMap.get( source.getId() );
- LOGGER.info( "CURRENT: " + current );
- // merge 'em
- ManagedSource working = MAPPER.convertValue(source, ManagedSource.class);
- LOGGER.info( "WORKING: " + working );
- ManagedSource updated = client.managedSource().update(current.getName(), (DataSource) working, current).sync();
- LOGGER.info( "UPDATED: " + updated );
-
- }
-
- }
-
- public void setup() {
-
- client = new DatasiftStreamProvider(null, config).getNewClient(config.getUserName(), config.getApiKey());
- }
-
- public void current() {
- ManagedSourceList managedSources = client.managedSource().get().sync();
- Iterator<ManagedSource> managedSourceIterator = managedSources.iterator();
- while( managedSourceIterator.hasNext() ) {
- ManagedSource source = managedSourceIterator.next();
- currentManagedSourceMap.put(source.getId(), source);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
deleted file mode 100644
index bdd2c97..0000000
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.streams.datasift.provider;
-
-import com.datasift.client.stream.DeletedInteraction;
-import com.datasift.client.stream.StreamEventListener;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.datasift.Datasift;
-import org.apache.streams.datasift.DatasiftConfiguration;
-import org.apache.streams.datasift.DatasiftWebhookData;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.ComponentUtils;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Resource;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.math.BigInteger;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
-
-/**
- * {@code DatasiftPushProvider} is an implementation of the {@link org.apache.streams.core.StreamsProvider} interface, with
- * annotations that allow it to bind as jersey resources within streams-runtime-dropwizard.
- *
- * Whereas GenericWebhookResource outputs ObjectNode datums, DatasiftPushProvider outputs Datasift datums, with
- * metadata when the json_meta endpoint is used.
- */
-@Resource
-@Path("/streams/webhooks/datasift")
-@Produces(MediaType.APPLICATION_JSON)
-@Consumes(MediaType.APPLICATION_JSON)
-public class DatasiftPushProvider implements StreamsProvider {
-
- private final static String STREAMS_ID = "DatasiftPushProvider";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftPushProvider.class);
-
- private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
- protected Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>();
-
- protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
- private static Pattern newLinePattern = Pattern.compile("(\\r\\n?|\\n)", Pattern.MULTILINE);
-
- @Override
- public String getId() {
- return STREAMS_ID;
- }
-
- @POST
- @Path("json")
- public Response json(@Context HttpHeaders headers,
- String body) {
-
- ObjectNode response = mapper.createObjectNode();
-
- StreamsDatum datum = new StreamsDatum(body);
-
- lock.writeLock().lock();
- ComponentUtils.offerUntilSuccess(datum, providerQueue);
- lock.writeLock().unlock();
-
- Boolean success = true;
-
- response.put("success", success);
-
- return Response.status(200).entity(response).build();
-
- }
-
- @POST
- @Path("json_new_line")
- public Response json_new_line(@Context HttpHeaders headers,
- String body) {
-
- ObjectNode response = mapper.createObjectNode();
-
- if (body.equalsIgnoreCase("{}")) {
-
- Boolean success = true;
-
- response.put("success", success);
-
- return Response.status(200).entity(response).build();
- }
-
- try {
-
- for( String item : Splitter.on(newLinePattern).split(body)) {
- StreamsDatum datum = new StreamsDatum(item);
-
- lock.writeLock().lock();
- ComponentUtils.offerUntilSuccess(datum, providerQueue);
- lock.writeLock().unlock();
-
- }
-
- Boolean success = true;
-
- response.put("success", success);
-
- return Response.status(200).entity(response).build();
-
- } catch (Exception e) {
- LOGGER.warn(e.toString(), e);
-
- Boolean success = false;
-
- response.put("success", success);
-
- return Response.status(500).entity(response).build();
-
- }
-
- }
-
- @POST
- @Path("json_meta")
- public Response json_meta(@Context HttpHeaders headers,
- String body) {
-
- //log.debug(headers.toString(), headers);
-
- //log.debug(body.toString(), body);
-
- ObjectNode response = mapper.createObjectNode();
-
- if (body.equalsIgnoreCase("{}")) {
-
- Boolean success = true;
-
- response.put("success", success);
-
- return Response.status(200).entity(response).build();
- }
-
- try {
-
- DatasiftWebhookData objectWrapper = mapper.readValue(body, DatasiftWebhookData.class);
-
- for( Datasift item : objectWrapper.getInteractions()) {
-
- String json = mapper.writeValueAsString(item);
-
- StreamsDatum datum = new StreamsDatum(json);
- if( item.getInteraction() != null &&
- !Strings.isNullOrEmpty(item.getInteraction().getId())) {
- datum.setId(item.getInteraction().getId());
- }
- if( item.getInteraction() != null &&
- item.getInteraction().getCreatedAt() != null) {
- datum.setTimestamp(item.getInteraction().getCreatedAt());
- }
- Map<String, Object> metadata = Maps.newHashMap();
- metadata.put("hash", objectWrapper.getHash());
- metadata.put("hashType", objectWrapper.getHashType());
- metadata.put("id",objectWrapper.getId());
-
- if( item.getInteraction() != null &&
- item.getInteraction().getTags() != null &&
- item.getInteraction().getTags().size() > 0) {
- metadata.put("tags", item.getInteraction().getTags());
- }
-
- datum.setMetadata(metadata);
-
- lock.writeLock().lock();
- ComponentUtils.offerUntilSuccess(datum, providerQueue);
- lock.writeLock().unlock();
- }
-
- Boolean success = true;
-
- response.put("success", success);
-
- return Response.status(200).entity(response).build();
-
- } catch (Exception e) {
- LOGGER.warn(e.toString(), e);
- }
-
- return Response.status(500).build();
- }
-
- @Override
- public void startStream() {
- return;
- }
-
- @Override
- public StreamsResultSet readCurrent() {
-
- StreamsResultSet current;
-
- lock.writeLock().lock();
- current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
- providerQueue.clear();
- lock.writeLock().unlock();
-
- return current;
-
- }
-
- @Override
- public StreamsResultSet readNew(BigInteger sequence) {
- return null;
- }
-
- @Override
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- return null;
- }
-
- @Override
- public boolean isRunning() {
- return true;
- }
-
- @Override
- public void prepare(Object configurationObject) {
-
- }
-
- @Override
- public void cleanUp() {
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0b512d8b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java
deleted file mode 100644
index 6ec395d..0000000
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamConfigurator.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.streams.datasift.provider;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import org.apache.streams.datasift.DatasiftConfiguration;
-import org.apache.streams.datasift.util.StreamsDatasiftMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- */
-public class DatasiftStreamConfigurator {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftStreamConfigurator.class);
-
- private static final ObjectMapper MAPPER = StreamsDatasiftMapper.getInstance();
-
- public static DatasiftConfiguration detectConfiguration(Config datasift) {
-
- DatasiftConfiguration datasiftConfiguration = null;
-
- try {
- datasiftConfiguration = MAPPER.readValue(datasift.root().render(ConfigRenderOptions.concise()), DatasiftConfiguration.class);
- } catch (Exception e) {
- e.printStackTrace();
- LOGGER.warn("Could not parse datasiftConfiguration");
- }
- return datasiftConfiguration;
- }
-
-}