You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/21 20:35:08 UTC
[1/9] incubator-streams git commit: drop wizard runtime
Repository: incubator-streams
Updated Branches:
refs/heads/STREAMS-222 [created] 149eb2310
drop wizard runtime
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a57c0172
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a57c0172
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a57c0172
Branch: refs/heads/STREAMS-222
Commit: a57c017211a499372fda1a511b6a7db063360f04
Parents: 91dd9a3
Author: sblackmon <sb...@apache.org>
Authored: Wed Nov 19 12:54:57 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Wed Nov 19 12:54:57 2014 -0600
----------------------------------------------------------------------
streams-runtimes/pom.xml | 1 +
.../streams-runtime-dropwizard/pom.xml | 248 +++++++++++++++++++
.../streams/dropwizard/StreamsApplication.java | 112 +++++++++
.../dropwizard/StreamsDropwizardModule.java | 54 ++++
.../streams/dropwizard/WebhookResource.java | 222 +++++++++++++++++
.../StreamsDropwizardConfiguration.json | 8 +
6 files changed, 645 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a57c0172/streams-runtimes/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/pom.xml b/streams-runtimes/pom.xml
index 5d43c28..7f9f68a 100644
--- a/streams-runtimes/pom.xml
+++ b/streams-runtimes/pom.xml
@@ -33,6 +33,7 @@
<packaging>pom</packaging>
<modules>
+ <module>streams-runtime-dropwizard</module>
<module>streams-runtime-local</module>
<module>streams-runtime-pig</module>
<module>streams-runtime-storm</module>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a57c0172/streams-runtimes/streams-runtime-dropwizard/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/pom.xml b/streams-runtimes/streams-runtime-dropwizard/pom.xml
new file mode 100644
index 0000000..ab6e927
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/pom.xml
@@ -0,0 +1,248 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-runtimes</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>streams-runtime-dropwizard</artifactId>
+
+ <properties>
+ <dropwizard.version>0.7.1</dropwizard.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>io.dropwizard</groupId>
+ <artifactId>dropwizard-core</artifactId>
+ <version>${dropwizard.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-afterburner</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>io.dropwizard</groupId>
+ <artifactId>dropwizard-configuration</artifactId>
+ <version>${dropwizard.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.dropwizard</groupId>
+ <artifactId>dropwizard-validation</artifactId>
+ <version>${dropwizard.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-afterburner</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-guava</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.hubspot.dropwizard</groupId>
+ <artifactId>dropwizard-guice</artifactId>
+ <version>0.7.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.inject</groupId>
+ <artifactId>guice</artifactId>
+ <version>3.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-core</artifactId>
+ <type>jar</type>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.validation</groupId>
+ <artifactId>validation-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-runtime-local</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-persist-console</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </dependency>
+
+ <!-- This ensures slf4j-log4j12 is not packaged in implementations -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.9.5</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jsonschema2pojo</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+ <configuration>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <generateBuilders>true</generateBuilders>
+ <sourcePaths>
+ <sourcePath>src/main/jsonschema</sourcePath>
+ </sourcePaths>
+ <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+ <targetPackage>org.apache.streams.dropwizard</targetPackage>
+ <useLongIntegers>true</useLongIntegers>
+ <useJodaDates>true</useJodaDates>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a57c0172/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
new file mode 100644
index 0000000..733b078
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
@@ -0,0 +1,112 @@
+package org.apache.streams.dropwizard;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.guava.GuavaModule;
+import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+import com.hubspot.dropwizard.guice.GuiceBundle;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigRenderOptions;
+import io.dropwizard.Application;
+import io.dropwizard.jackson.GuavaExtrasModule;
+import io.dropwizard.setup.Bootstrap;
+import io.dropwizard.setup.Environment;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.console.ConsolePersistWriter;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class StreamsApplication extends Application<StreamsDropwizardConfiguration> {
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(StreamsApplication.class);
+
+ private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ private StreamBuilder builder;
+
+ private WebhookResource webhook;
+
+ private String broadcastURI;
+
+ private Executor executor = Executors.newSingleThreadExecutor();
+
+ static {
+ mapper.registerModule(new AfterburnerModule());
+ mapper.registerModule(new GuavaModule());
+ mapper.registerModule(new GuavaExtrasModule());
+ }
+
+ @Override
+ public void initialize(Bootstrap<StreamsDropwizardConfiguration> bootstrap) {
+
+ LOGGER.info(getClass().getPackage().getName());
+
+ GuiceBundle<StreamsDropwizardConfiguration> guiceBundle =
+ GuiceBundle.<StreamsDropwizardConfiguration>newBuilder()
+ .addModule(new StreamsDropwizardModule())
+ .setConfigClass(StreamsDropwizardConfiguration.class)
+ // override and add more packages to pick up custom Resources
+ .enableAutoConfig(getClass().getPackage().getName())
+ .build();
+ bootstrap.addBundle(guiceBundle);
+
+ }
+
+ @Override
+ public void run(StreamsDropwizardConfiguration streamsDropwizardConfiguration, Environment environment) throws Exception {
+
+ webhook = new WebhookResource();
+
+ executor = Executors.newSingleThreadExecutor();
+
+ executor.execute(new StreamsDropwizardRunner());
+
+ // wait for streams to start up
+ Thread.sleep(10000);
+
+ //environment.jersey().register(webhook);
+
+ }
+
+ private class StreamsDropwizardRunner implements Runnable {
+
+ @Override
+ public void run() {
+
+ Map<String, Object> streamConfig = Maps.newHashMap();
+ streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 20 * 60 * 1000 * 1000);
+ if(! Strings.isNullOrEmpty(broadcastURI) ) streamConfig.put("broadcastURI", broadcastURI);
+ builder = new LocalStreamBuilder(1000, streamConfig);
+
+ // prepare stream components
+ builder.newPerpetualStream("webhooks", webhook);
+
+ builder.addStreamsPersistWriter("console", new ConsolePersistWriter(), 1, "webhooks");
+ }
+ }
+
+
+ public static void main(String[] args) throws Exception
+ {
+
+ StreamsApplication application = new StreamsApplication();
+ if( args.length == 1 ) application.broadcastURI = args[0];
+ application.run(args);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a57c0172/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
new file mode 100644
index 0000000..4264dbb
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
@@ -0,0 +1,54 @@
+package org.apache.streams.dropwizard;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigRenderOptions;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+
+import java.io.IOException;
+
+/**
+ * Created by sblackmon on 11/18/14.
+ */
+public class StreamsDropwizardModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ // anything you'd like to configure
+ }
+
+ @Provides
+ public StreamsConfiguration providesStreamsConfiguration(StreamsDropwizardConfiguration configuration) {
+ return StreamsConfigurator.detectConfiguration();
+ }
+
+// private StreamsDropwizardConfiguration reconfigure(StreamsDropwizardConfiguration streamsConfiguration) {
+//
+// // config from dropwizard
+// Config configDropwizard = null;
+// try {
+// configDropwizard = ConfigFactory.parseString(mapper.writeValueAsString(streamsConfiguration));
+// } catch (JsonProcessingException e) {
+// e.printStackTrace();
+// LOGGER.error("Invalid Configuration: " + streamsConfiguration);
+// }
+//
+// Config combinedConfig = configTypesafe.withFallback(configDropwizard);
+// String combinedConfigJson = combinedConfig.root().render(ConfigRenderOptions.concise());
+//
+// StreamsDropwizardConfiguration combinedDropwizardConfig = null;
+// try {
+// combinedDropwizardConfig = mapper.readValue(combinedConfigJson, StreamsDropwizardConfiguration.class);
+// } catch (IOException e) {
+// e.printStackTrace();
+// LOGGER.error("Invalid Configuration after merge: " + streamsConfiguration);
+// }
+//
+// return combinedDropwizardConfig;
+//
+// }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a57c0172/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/WebhookResource.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/WebhookResource.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/WebhookResource.java
new file mode 100644
index 0000000..1f80c5c
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/WebhookResource.java
@@ -0,0 +1,222 @@
+package org.apache.streams.dropwizard;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Queues;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.ComponentUtils;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Resource;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.math.BigInteger;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
+
+@Resource
+@Path("/streams/webhooks")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+public class WebhookResource implements StreamsProvider {
+
+ public WebhookResource() {
+ }
+
+ private static final Logger log = LoggerFactory
+ .getLogger(WebhookResource.class);
+
+ private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ protected Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>();
+
+ protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private static Pattern newLinePattern = Pattern.compile("(\\r\\n?|\\n)", Pattern.MULTILINE);
+
+ @POST
+ @Path("json")
+ public Response json(@Context HttpHeaders headers,
+ String body) {
+
+ ObjectNode response = mapper.createObjectNode();
+
+ StreamsDatum datum = new StreamsDatum(body);
+
+ lock.writeLock().lock();
+ ComponentUtils.offerUntilSuccess(datum, providerQueue);
+ lock.writeLock().unlock();
+
+ Boolean success = true;
+
+ response.put("success", success);
+
+ return Response.status(200).entity(response).build();
+
+ }
+
+ @POST
+ @Path("json_new_line")
+ public Response json_new_line(@Context HttpHeaders headers,
+ String body) {
+
+ ObjectNode response = mapper.createObjectNode();
+
+ if (body.equalsIgnoreCase("{}")) {
+
+ Boolean success = true;
+
+ response.put("success", success);
+
+ return Response.status(200).entity(response).build();
+ }
+
+ try {
+
+ for( String item : Splitter.on('\n').split(body)) {
+ StreamsDatum datum = new StreamsDatum(item);
+
+ lock.writeLock().lock();
+ ComponentUtils.offerUntilSuccess(datum, providerQueue);
+ lock.writeLock().unlock();
+
+ }
+
+ Boolean success = true;
+
+ response.put("success", success);
+
+ return Response.status(200).entity(response).build();
+
+ } catch (Exception e) {
+ log.warn(e.toString(), e);
+
+ Boolean success = false;
+
+ response.put("success", success);
+
+ return Response.status(500).entity(response).build();
+
+ }
+
+ }
+
+// @POST
+// @Path("json_meta")
+// public Response json_meta(@Context HttpHeaders headers,
+// String body) {
+//
+// //log.debug(headers.toString(), headers);
+//
+// //log.debug(body.toString(), body);
+//
+// ObjectNode response = mapper.createObjectNode();
+//
+// if (body.equalsIgnoreCase("{}")) {
+//
+// Boolean success = true;
+//
+// response.put("success", success);
+//
+// return Response.status(200).entity(response).build();
+// }
+//
+// try {
+//
+// ObjectNode objectWrapper = mapper.readValue(body, ObjectNode.class);
+//
+// for( ObjectNode item : objectWrapper.getData()) {
+//
+// String json = mapper.writeValueAsString(item);
+//
+// StreamsDatum datum = new StreamsDatum(json, item.getInteraction().getId(), item.getInteraction().getCreatedAt());
+//
+// lock.writeLock().lock();
+// ComponentUtils.offerUntilSuccess(datum, providerQueue);
+// lock.writeLock().unlock();
+// }
+//
+// log.info("interactionQueue: " + providerQueue.size());
+//
+// Boolean success = true;
+//
+// response.put("success", success);
+//
+// return Response.status(200).entity(response).build();
+//
+// } catch (Exception e) {
+// log.warn(e.toString(), e);
+// }
+//
+// return Response.status(500).build();
+// }
+
+ @Override
+ public void startStream() {
+ return;
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+
+ StreamsResultSet current;
+
+ lock.writeLock().lock();
+ current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+ providerQueue.clear();
+ lock.writeLock().unlock();
+
+ return current;
+
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return null;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return false;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+
+ public void addDatum(StreamsDatum datum) {
+ try {
+ lock.readLock().lock();
+ ComponentUtils.offerUntilSuccess(datum, providerQueue);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a57c0172/streams-runtimes/streams-runtime-dropwizard/src/main/jsonschema/org/apache/streams/dropwizard/StreamsDropwizardConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/jsonschema/org/apache/streams/dropwizard/StreamsDropwizardConfiguration.json b/streams-runtimes/streams-runtime-dropwizard/src/main/jsonschema/org/apache/streams/dropwizard/StreamsDropwizardConfiguration.json
new file mode 100644
index 0000000..9faee44
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/jsonschema/org/apache/streams/dropwizard/StreamsDropwizardConfiguration.json
@@ -0,0 +1,8 @@
+{
+ "type": "object",
+ "javaType" : "org.apache.streams.dropwizard.StreamsDropwizardConfiguration",
+ "extends": {
+ "javaType": "io.dropwizard.Configuration",
+ "type": "object"
+ }
+}
[9/9] incubator-streams git commit: javadoc
Posted by sb...@apache.org.
javadoc
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/149eb231
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/149eb231
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/149eb231
Branch: refs/heads/STREAMS-222
Commit: 149eb23109ce4a77042c58f65ee79eee786ac9f3
Parents: 3d5f291
Author: sblackmon <sb...@apache.org>
Authored: Fri Nov 21 13:30:44 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Fri Nov 21 13:30:44 2014 -0600
----------------------------------------------------------------------
.../streams/datasift/provider/DatasiftPushProvider.java | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/149eb231/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
index cce5930..dae48ac 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
@@ -58,9 +58,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
/**
- * Requires Java Version 1.7!
- * {@code DatasiftStreamProvider} is an implementation of the {@link org.apache.streams.core.StreamsProvider} interface. The provider
- * uses the Datasift java api to make connections. A single provider creates one connection per StreamHash in the configuration.
+ * {@code DatasiftPushProvider} is an implementation of the {@link org.apache.streams.core.StreamsProvider} interface, with
+ * annotations that allow it to bind as jersey resources within streams-runtime-dropwizard.
+ *
+ * Whereas GenericWebhookResource outputs ObjectNode datums, DatasiftPushProvider outputs Datasift datums, with
+ * metadata when the json_meta endpoint is used.
*/
@Resource
@Path("/streams/webhooks/datasift")
[7/9] incubator-streams git commit: added tests decided not to add
StreamsResource to core at this time
Posted by sb...@apache.org.
added tests
decided not to add StreamsResource to core at this time
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6c989cb6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6c989cb6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6c989cb6
Branch: refs/heads/STREAMS-222
Commit: 6c989cb69ea0e2013d1543e987ce6b7325e30859
Parents: 069969d
Author: sblackmon <sb...@apache.org>
Authored: Fri Nov 21 13:24:45 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Fri Nov 21 13:24:45 2014 -0600
----------------------------------------------------------------------
.../apache/streams/core/StreamsResource.java | 17 ----
.../test/GenericWebhookResourceTest.java | 82 ++++++++++++++++++++
.../dropwizard/test/StreamsApplicationIT.java | 30 +++++++
.../dropwizard/test/TestStreamsApplication.java | 9 +++
.../src/test/resources/configuration.yml | 27 +++++++
5 files changed, 148 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6c989cb6/streams-core/src/main/java/org/apache/streams/core/StreamsResource.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsResource.java b/streams-core/src/main/java/org/apache/streams/core/StreamsResource.java
deleted file mode 100644
index 4bd18e2..0000000
--- a/streams-core/src/main/java/org/apache/streams/core/StreamsResource.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package org.apache.streams.core;
-
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.Response;
-
-/**
- * Created by sblackmon on 11/20/14.
- */
-public interface StreamsResource {
-
- public Response json(HttpHeaders headers, String body);
-
- public Response json_new_line(HttpHeaders headers, String body);
-
- public Response json_meta(HttpHeaders headers, String body);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6c989cb6/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java b/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java
new file mode 100644
index 0000000..d1d02ac
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java
@@ -0,0 +1,82 @@
+package org.apache.streams.dropwizard.test;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import com.google.common.io.Resources;
+import io.dropwizard.testing.junit.DropwizardAppRule;
+import io.dropwizard.testing.junit.ResourceTestRule;
+import org.apache.streams.dropwizard.GenericWebhookData;
+import org.apache.streams.dropwizard.GenericWebhookResource;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Person;
+import org.joda.time.DateTime;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.ws.rs.core.HttpHeaders;
+import javax.xml.ws.Response;
+
+import java.util.List;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * Created by sblackmon on 11/21/14.
+ */
+public class GenericWebhookResourceTest {
+
+ private static ObjectMapper mapper = new StreamsJacksonMapper();
+
+ private static final GenericWebhookResource genericWebhookResource = new GenericWebhookResource();
+
+ @ClassRule
+ public static final ResourceTestRule resources = ResourceTestRule.builder()
+ .addResource(genericWebhookResource)
+ .build();
+
+ @Test
+ public void testPostJson() {
+ Assert.assertEquals(400, genericWebhookResource.json(null, "{").getStatus());
+ Assert.assertEquals(400, genericWebhookResource.json(null, "}").getStatus());
+ Assert.assertEquals(400, genericWebhookResource.json(null, "srg").getStatus());
+ Assert.assertEquals(400, genericWebhookResource.json(null, "123").getStatus());
+ Assert.assertEquals(200, genericWebhookResource.json(null, "{}").getStatus());
+ Assert.assertEquals(200, genericWebhookResource.json(null, "{\"valid\":\"true\"}").getStatus());
+ };
+
+ @Test
+ public void testPostJsonNewLine() {
+ Assert.assertEquals(200, genericWebhookResource.json_new_line(null, "{}").getStatus());
+ Assert.assertEquals(400, genericWebhookResource.json_new_line(null, "notvalid").getStatus());
+ Assert.assertEquals(200, genericWebhookResource.json_new_line(null, "{\"valid\":\"true\"}").getStatus());
+ Assert.assertEquals(200, genericWebhookResource.json_new_line(null, "{\"valid\":\"true\"}\n{\"valid\":\"true\"}\r{\"valid\":\"true\"}").getStatus());
+ };
+
+ @Test
+ public void testPostJsonMeta() throws JsonProcessingException {
+ Assert.assertEquals(200, genericWebhookResource.json_meta(null, "{}").getStatus());
+ Assert.assertEquals(400, genericWebhookResource.json_meta(null, "notvalid").getStatus());
+ GenericWebhookData testPostJsonMeta = new GenericWebhookData()
+ .withHash("test")
+ .withDeliveredAt(DateTime.now())
+ .withCount(1)
+ .withHashType("type")
+ .withId("test");
+ List<ObjectNode> testPostJsonData = Lists.newArrayList();
+ testPostJsonData.add(mapper.createObjectNode().put("valid", "true"));
+ testPostJsonMeta.setData(testPostJsonData);
+ String testPostJsonEntity = mapper.writeValueAsString(testPostJsonMeta);
+ Assert.assertEquals(200, genericWebhookResource.json_meta(null, testPostJsonEntity).getStatus());
+
+ };
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6c989cb6/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/StreamsApplicationIT.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/StreamsApplicationIT.java b/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/StreamsApplicationIT.java
new file mode 100644
index 0000000..83b7b89
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/StreamsApplicationIT.java
@@ -0,0 +1,30 @@
+package org.apache.streams.dropwizard.test;
+
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.URL;
+
+/**
+ * Tests {@link: org.apache.streams.dropwizard.StreamsApplication}
+ */
+public class StreamsApplicationIT {
+
+ @Before
+ public void setupTest() throws Exception {
+ String[] testArgs = Lists.newArrayList("server", "src/test/resources/configuration.yml").toArray(new String[2]);
+ TestStreamsApplication.main(testArgs);
+ }
+
+ @Test
+ public void testApplicationStarted() throws Exception {
+
+ final URL url = new URL("http://localhost:8003/admin/ping");
+ final String response = new BufferedReader(new InputStreamReader(url.openStream())).readLine();
+ Assert.assertEquals("pong", response);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6c989cb6/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/TestStreamsApplication.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/TestStreamsApplication.java b/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/TestStreamsApplication.java
new file mode 100644
index 0000000..7fe17cf
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/TestStreamsApplication.java
@@ -0,0 +1,9 @@
+package org.apache.streams.dropwizard.test;
+
+import org.apache.streams.dropwizard.StreamsApplication;
+
+/**
+ * This class exists to support {@link: org.apache.streams.dropwizard.test.StreamsApplicationIT}
+ */
+public class TestStreamsApplication extends StreamsApplication {
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6c989cb6/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml b/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml
new file mode 100644
index 0000000..e2943cd
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml
@@ -0,0 +1,27 @@
+template: Hello, %s!
+defaultName: datasift
+
+server:
+ type: simple
+ applicationContextPath: /
+ adminContextPath: /admin
+ connector:
+ type: http
+ port: 8000
+
+logging:
+ level: DEBUG
+ appenders:
+ - type: console
+ threshold: ALL
+ target: stdout
+
+elasticsearch:
+ hosts:
+ - "localhost"
+ port: 9300
+ clusterName: elasticsearch
+ index: datasift_webhook
+ type: activity
+ batchSize: 100
+
[3/9] incubator-streams git commit: /streams/webhooks/* confirmed
working
Posted by sb...@apache.org.
/streams/webhooks/* confirmed working
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/dbda9ed7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/dbda9ed7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/dbda9ed7
Branch: refs/heads/STREAMS-222
Commit: dbda9ed7075d0c65f0cd3c1db768dc35e63b22c0
Parents: e611290
Author: sblackmon <sb...@apache.org>
Authored: Thu Nov 20 15:28:31 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Thu Nov 20 15:28:31 2014 -0600
----------------------------------------------------------------------
.../apache/streams/datasift/DatasiftPush.json | 30 ---
streams-core/pom.xml | 5 +
.../apache/streams/core/StreamsResource.java | 17 ++
.../streams-runtime-dropwizard/pom.xml | 5 -
.../dropwizard/GenericWebhookResource.java | 217 ++++++++++++++++++
.../dropwizard/StreamDropwizardBuilder.java | 39 ++++
.../streams/dropwizard/StreamsApplication.java | 84 +++++--
.../dropwizard/StreamsDropwizardModule.java | 31 +--
.../streams/dropwizard/WebhookResource.java | 222 -------------------
.../streams/dropwizard/GenericWebhookData.json | 30 +++
10 files changed, 377 insertions(+), 303 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftPush.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftPush.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftPush.json
deleted file mode 100644
index 50e4f00..0000000
--- a/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftPush.json
+++ /dev/null
@@ -1,30 +0,0 @@
-{
- "type": "object",
- "$schema": "http://json-schema.org/draft-03/schema",
- "javaType": "org.apache.streams.datasift.DatasiftPush",
- "properties": {
- "id": {
- "type": "string"
- },
- "hash": {
- "type": "string"
- },
- "hash_type": {
- "type": "string"
- },
- "count": {
- "type": "long"
- },
- "delivered_at": {
- "type": "string",
- "format": "date-time"
- },
- "interactions": {
- "type": "array",
- "items": {
- "type": "object",
- "javaType": "org.apache.streams.datasift.Datasift"
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-core/pom.xml
----------------------------------------------------------------------
diff --git a/streams-core/pom.xml b/streams-core/pom.xml
index 9546b5f..950687f 100644
--- a/streams-core/pom.xml
+++ b/streams-core/pom.xml
@@ -36,6 +36,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>javax.ws.rs</groupId>
+ <artifactId>jsr311-api</artifactId>
+ <version>1.1.1</version>
+ </dependency>
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-core/src/main/java/org/apache/streams/core/StreamsResource.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsResource.java b/streams-core/src/main/java/org/apache/streams/core/StreamsResource.java
new file mode 100644
index 0000000..4bd18e2
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamsResource.java
@@ -0,0 +1,17 @@
+package org.apache.streams.core;
+
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+
+/**
+ * Created by sblackmon on 11/20/14.
+ */
+public interface StreamsResource {
+
+ public Response json(HttpHeaders headers, String body);
+
+ public Response json_new_line(HttpHeaders headers, String body);
+
+ public Response json_meta(HttpHeaders headers, String body);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-runtimes/streams-runtime-dropwizard/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/pom.xml b/streams-runtimes/streams-runtime-dropwizard/pom.xml
index ab6e927..045fa86 100644
--- a/streams-runtimes/streams-runtime-dropwizard/pom.xml
+++ b/streams-runtimes/streams-runtime-dropwizard/pom.xml
@@ -138,11 +138,6 @@
<artifactId>streams-runtime-local</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-persist-console</artifactId>
- <version>${project.version}</version>
- </dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
new file mode 100644
index 0000000..56d57b0
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
@@ -0,0 +1,217 @@
+package org.apache.streams.dropwizard;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Queues;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResource;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.ComponentUtils;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Resource;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
+
+@Resource
+@Path("/streams/webhooks")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+public class GenericWebhookResource implements StreamsProvider, StreamsResource {
+
+ public GenericWebhookResource() {
+ }
+
+ private static final Logger log = LoggerFactory
+ .getLogger(GenericWebhookResource.class);
+
+ private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ protected Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>();
+
+ protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private static Pattern newLinePattern = Pattern.compile("(\\r\\n?|\\n)", Pattern.MULTILINE);
+
+ @POST
+ @Path("json")
+ public Response json(@Context HttpHeaders headers,
+ String body) {
+
+ ObjectNode response = mapper.createObjectNode();
+
+ StreamsDatum datum = new StreamsDatum(body);
+
+ lock.writeLock().lock();
+ ComponentUtils.offerUntilSuccess(datum, providerQueue);
+ lock.writeLock().unlock();
+
+ Boolean success = true;
+
+ response.put("success", success);
+
+ return Response.status(200).entity(response).build();
+
+ }
+
+ @POST
+ @Path("json_new_line")
+ public Response json_new_line(@Context HttpHeaders headers,
+ String body) {
+
+ ObjectNode response = mapper.createObjectNode();
+
+ if (body.equalsIgnoreCase("{}")) {
+
+ Boolean success = true;
+
+ response.put("success", success);
+
+ return Response.status(200).entity(response).build();
+ }
+
+ try {
+
+ for( String item : Splitter.on(newLinePattern).split(body)) {
+ StreamsDatum datum = new StreamsDatum(item);
+
+ lock.writeLock().lock();
+ ComponentUtils.offerUntilSuccess(datum, providerQueue);
+ lock.writeLock().unlock();
+
+ }
+
+ Boolean success = true;
+
+ response.put("success", success);
+
+ return Response.status(200).entity(response).build();
+
+ } catch (Exception e) {
+ log.warn(e.toString(), e);
+
+ Boolean success = false;
+
+ response.put("success", success);
+
+ return Response.status(500).entity(response).build();
+
+ }
+
+ }
+
+ @POST
+ @Path("json_meta")
+ public Response json_meta(@Context HttpHeaders headers,
+ String body) {
+
+ //log.debug(headers.toString(), headers);
+
+ //log.debug(body.toString(), body);
+
+ ObjectNode response = mapper.createObjectNode();
+
+ if (body.equalsIgnoreCase("{}")) {
+
+ Boolean success = true;
+
+ response.put("success", success);
+
+ return Response.status(200).entity(response).build();
+ }
+
+ try {
+
+ GenericWebhookData objectWrapper = mapper.readValue(body, GenericWebhookData.class);
+
+ for( ObjectNode item : objectWrapper.getData()) {
+
+ String json = mapper.writeValueAsString(item);
+
+ StreamsDatum datum = new StreamsDatum(json);
+
+ lock.writeLock().lock();
+ ComponentUtils.offerUntilSuccess(datum, providerQueue);
+ lock.writeLock().unlock();
+ }
+
+ Boolean success = true;
+
+ response.put("success", success);
+
+ return Response.status(200).entity(response).build();
+
+ } catch (Exception e) {
+ log.warn(e.toString(), e);
+ }
+
+ return Response.status(500).build();
+ }
+
+ public List<ObjectNode> getData(GenericWebhookData wrapper) {
+ return wrapper.getData();
+ }
+
+ @Override
+ public void startStream() {
+ return;
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+
+ StreamsResultSet current;
+
+ lock.writeLock().lock();
+ current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+ providerQueue.clear();
+ lock.writeLock().unlock();
+
+ return current;
+
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return null;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return true;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
new file mode 100644
index 0000000..4292900
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
@@ -0,0 +1,39 @@
+package org.apache.streams.dropwizard;
+
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.Map;
+
+/**
+ * Created by sblackmon on 11/20/14.
+ */
+public class StreamDropwizardBuilder extends LocalStreamBuilder implements StreamBuilder {
+
+ public StreamDropwizardBuilder() {
+ super();
+ }
+
+ public StreamDropwizardBuilder(Map<String, Object> streamConfig) {
+ super(streamConfig);
+ }
+
+ public StreamDropwizardBuilder(int maxQueueCapacity) {
+ super(maxQueueCapacity);
+ }
+
+ public StreamDropwizardBuilder(int maxQueueCapacity, Map<String, Object> streamConfig) {
+ super(maxQueueCapacity, streamConfig);
+ }
+
+ @Override
+ public StreamBuilder newPerpetualStream(String streamId, StreamsProvider provider) {
+ return super.newPerpetualStream(streamId, provider);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
index 733b078..67d0446 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
@@ -6,7 +6,9 @@ import com.fasterxml.jackson.datatype.guava.GuavaModule;
import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.hubspot.dropwizard.guice.GuiceBundle;
+import com.sun.jersey.api.core.ResourceConfig;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigRenderOptions;
@@ -14,22 +16,37 @@ import io.dropwizard.Application;
import io.dropwizard.jackson.GuavaExtrasModule;
import io.dropwizard.setup.Bootstrap;
import io.dropwizard.setup.Environment;
+import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.console.ConsolePersistWriter;
import org.apache.streams.core.StreamBuilder;
import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.core.StreamsProvider;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.local.builders.LocalStreamBuilder;
import org.apache.streams.pojo.json.Activity;
+import org.joda.time.DateTime;
+import org.reflections.Reflections;
+import org.reflections.util.ConfigurationBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import com.google.inject.Inject;
+
+import javax.annotation.Resource;
+import javax.ws.rs.Path;
+
public class StreamsApplication extends Application<StreamsDropwizardConfiguration> {
private static final Logger LOGGER = LoggerFactory
@@ -37,11 +54,11 @@ public class StreamsApplication extends Application<StreamsDropwizardConfigurati
private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
- private StreamBuilder builder;
+ protected StreamBuilder builder;
- private WebhookResource webhook;
+ private static StreamsConfiguration streamsConfiguration;
- private String broadcastURI;
+ private Set<StreamsProvider> resourceProviders = Sets.newConcurrentHashSet();
private Executor executor = Executors.newSingleThreadExecutor();
@@ -70,33 +87,63 @@ public class StreamsApplication extends Application<StreamsDropwizardConfigurati
@Override
public void run(StreamsDropwizardConfiguration streamsDropwizardConfiguration, Environment environment) throws Exception {
- webhook = new WebhookResource();
-
executor = Executors.newSingleThreadExecutor();
- executor.execute(new StreamsDropwizardRunner());
+ for( Class<?> resourceProviderClass : environment.jersey().getResourceConfig().getRootResourceClasses() ) {
+ StreamsProvider provider = (StreamsProvider)resourceProviderClass.newInstance();
+ if( StreamsProvider.class.isInstance(provider))
+ resourceProviders.add(provider);
+ }
+
+ streamsConfiguration = mapper.convertValue(streamsDropwizardConfiguration, StreamsConfiguration.class);
+
+ builder = setup(streamsConfiguration, resourceProviders);
+
+ executor.execute(new StreamsDropwizardRunner(builder, streamsConfiguration));
// wait for streams to start up
Thread.sleep(10000);
- //environment.jersey().register(webhook);
+ for (StreamsProvider resource : resourceProviders) {
+ environment.jersey().register(resource);
+ LOGGER.info("Added resource class: {}", resource);
+ }
+
+ }
+
+ public StreamBuilder setup(StreamsConfiguration streamsConfiguration, Set<StreamsProvider> resourceProviders) {
+
+ Map<String, Object> streamConfig = Maps.newHashMap();
+ streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 20 * 60 * 1000 * 1000);
+ if(! Strings.isNullOrEmpty(streamsConfiguration.getBroadcastURI()) ) streamConfig.put("broadcastURI", streamsConfiguration.getBroadcastURI());
+ StreamBuilder builder = new StreamDropwizardBuilder(1000, streamConfig);
+ List<String> providers = new ArrayList<>();
+ for( StreamsProvider provider: resourceProviders) {
+ String providerId = provider.getClass().getSimpleName();
+ builder.newPerpetualStream(providerId, provider);
+ providers.add(providerId);
+ }
+
+ return builder;
}
private class StreamsDropwizardRunner implements Runnable {
+ private StreamsConfiguration streamsConfiguration;
+
+ private StreamBuilder builder;
+
+ protected StreamsDropwizardRunner(StreamBuilder builder, StreamsConfiguration streamsConfiguration) {
+ this.streamsConfiguration = streamsConfiguration;
+ this.builder = builder;
+ }
+
@Override
public void run() {
- Map<String, Object> streamConfig = Maps.newHashMap();
- streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 20 * 60 * 1000 * 1000);
- if(! Strings.isNullOrEmpty(broadcastURI) ) streamConfig.put("broadcastURI", broadcastURI);
- builder = new LocalStreamBuilder(1000, streamConfig);
+ builder.start();
- // prepare stream components
- builder.newPerpetualStream("webhooks", webhook);
-
- builder.addStreamsPersistWriter("console", new ConsolePersistWriter(), 1, "webhooks");
}
}
@@ -104,9 +151,8 @@ public class StreamsApplication extends Application<StreamsDropwizardConfigurati
public static void main(String[] args) throws Exception
{
- StreamsApplication application = new StreamsApplication();
- if( args.length == 1 ) application.broadcastURI = args[0];
- application.run(args);
+ new StreamsApplication().run(args);
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
index 4264dbb..f5cd020 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
@@ -3,6 +3,7 @@ package org.apache.streams.dropwizard;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
+import com.google.inject.Singleton;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigRenderOptions;
@@ -18,37 +19,13 @@ public class StreamsDropwizardModule extends AbstractModule {
@Override
protected void configure() {
- // anything you'd like to configure
+ requestStaticInjection(StreamsConfiguration.class);
}
@Provides
- public StreamsConfiguration providesStreamsConfiguration(StreamsDropwizardConfiguration configuration) {
+ @Singleton
+ public StreamsConfiguration providesStreamsConfiguration() {
return StreamsConfigurator.detectConfiguration();
}
-// private StreamsDropwizardConfiguration reconfigure(StreamsDropwizardConfiguration streamsConfiguration) {
-//
-// // config from dropwizard
-// Config configDropwizard = null;
-// try {
-// configDropwizard = ConfigFactory.parseString(mapper.writeValueAsString(streamsConfiguration));
-// } catch (JsonProcessingException e) {
-// e.printStackTrace();
-// LOGGER.error("Invalid Configuration: " + streamsConfiguration);
-// }
-//
-// Config combinedConfig = configTypesafe.withFallback(configDropwizard);
-// String combinedConfigJson = combinedConfig.root().render(ConfigRenderOptions.concise());
-//
-// StreamsDropwizardConfiguration combinedDropwizardConfig = null;
-// try {
-// combinedDropwizardConfig = mapper.readValue(combinedConfigJson, StreamsDropwizardConfiguration.class);
-// } catch (IOException e) {
-// e.printStackTrace();
-// LOGGER.error("Invalid Configuration after merge: " + streamsConfiguration);
-// }
-//
-// return combinedDropwizardConfig;
-//
-// }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/WebhookResource.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/WebhookResource.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/WebhookResource.java
deleted file mode 100644
index 1f80c5c..0000000
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/WebhookResource.java
+++ /dev/null
@@ -1,222 +0,0 @@
-package org.apache.streams.dropwizard;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Queues;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.ComponentUtils;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Resource;
-import javax.inject.Inject;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.math.BigInteger;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
-
-@Resource
-@Path("/streams/webhooks")
-@Produces(MediaType.APPLICATION_JSON)
-@Consumes(MediaType.APPLICATION_JSON)
-public class WebhookResource implements StreamsProvider {
-
- public WebhookResource() {
- }
-
- private static final Logger log = LoggerFactory
- .getLogger(WebhookResource.class);
-
- private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
- protected Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>();
-
- protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
- private static Pattern newLinePattern = Pattern.compile("(\\r\\n?|\\n)", Pattern.MULTILINE);
-
- @POST
- @Path("json")
- public Response json(@Context HttpHeaders headers,
- String body) {
-
- ObjectNode response = mapper.createObjectNode();
-
- StreamsDatum datum = new StreamsDatum(body);
-
- lock.writeLock().lock();
- ComponentUtils.offerUntilSuccess(datum, providerQueue);
- lock.writeLock().unlock();
-
- Boolean success = true;
-
- response.put("success", success);
-
- return Response.status(200).entity(response).build();
-
- }
-
- @POST
- @Path("json_new_line")
- public Response json_new_line(@Context HttpHeaders headers,
- String body) {
-
- ObjectNode response = mapper.createObjectNode();
-
- if (body.equalsIgnoreCase("{}")) {
-
- Boolean success = true;
-
- response.put("success", success);
-
- return Response.status(200).entity(response).build();
- }
-
- try {
-
- for( String item : Splitter.on('\n').split(body)) {
- StreamsDatum datum = new StreamsDatum(item);
-
- lock.writeLock().lock();
- ComponentUtils.offerUntilSuccess(datum, providerQueue);
- lock.writeLock().unlock();
-
- }
-
- Boolean success = true;
-
- response.put("success", success);
-
- return Response.status(200).entity(response).build();
-
- } catch (Exception e) {
- log.warn(e.toString(), e);
-
- Boolean success = false;
-
- response.put("success", success);
-
- return Response.status(500).entity(response).build();
-
- }
-
- }
-
-// @POST
-// @Path("json_meta")
-// public Response json_meta(@Context HttpHeaders headers,
-// String body) {
-//
-// //log.debug(headers.toString(), headers);
-//
-// //log.debug(body.toString(), body);
-//
-// ObjectNode response = mapper.createObjectNode();
-//
-// if (body.equalsIgnoreCase("{}")) {
-//
-// Boolean success = true;
-//
-// response.put("success", success);
-//
-// return Response.status(200).entity(response).build();
-// }
-//
-// try {
-//
-// ObjectNode objectWrapper = mapper.readValue(body, ObjectNode.class);
-//
-// for( ObjectNode item : objectWrapper.getData()) {
-//
-// String json = mapper.writeValueAsString(item);
-//
-// StreamsDatum datum = new StreamsDatum(json, item.getInteraction().getId(), item.getInteraction().getCreatedAt());
-//
-// lock.writeLock().lock();
-// ComponentUtils.offerUntilSuccess(datum, providerQueue);
-// lock.writeLock().unlock();
-// }
-//
-// log.info("interactionQueue: " + providerQueue.size());
-//
-// Boolean success = true;
-//
-// response.put("success", success);
-//
-// return Response.status(200).entity(response).build();
-//
-// } catch (Exception e) {
-// log.warn(e.toString(), e);
-// }
-//
-// return Response.status(500).build();
-// }
-
- @Override
- public void startStream() {
- return;
- }
-
- @Override
- public StreamsResultSet readCurrent() {
-
- StreamsResultSet current;
-
- lock.writeLock().lock();
- current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
- providerQueue.clear();
- lock.writeLock().unlock();
-
- return current;
-
- }
-
- @Override
- public StreamsResultSet readNew(BigInteger sequence) {
- return null;
- }
-
- @Override
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- return null;
- }
-
- @Override
- public boolean isRunning() {
- return false;
- }
-
- @Override
- public void prepare(Object configurationObject) {
-
- }
-
- @Override
- public void cleanUp() {
-
- }
-
- public void addDatum(StreamsDatum datum) {
- try {
- lock.readLock().lock();
- ComponentUtils.offerUntilSuccess(datum, providerQueue);
- } finally {
- lock.readLock().unlock();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-runtimes/streams-runtime-dropwizard/src/main/jsonschema/org/apache/streams/dropwizard/GenericWebhookData.json
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/jsonschema/org/apache/streams/dropwizard/GenericWebhookData.json b/streams-runtimes/streams-runtime-dropwizard/src/main/jsonschema/org/apache/streams/dropwizard/GenericWebhookData.json
new file mode 100644
index 0000000..53a26c7
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/jsonschema/org/apache/streams/dropwizard/GenericWebhookData.json
@@ -0,0 +1,30 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "javaType": "org.apache.streams.dropwizard.GenericWebhookData",
+ "properties": {
+ "id": {
+ "type": "string"
+ },
+ "hash": {
+ "type": "string"
+ },
+ "hash_type": {
+ "type": "string"
+ },
+ "count": {
+ "type": "long"
+ },
+ "delivered_at": {
+ "type": "string",
+ "format": "date-time"
+ },
+ "data": {
+ "type": "array",
+ "items": {
+ "type": "object",
+ "javaType": "com.fasterxml.jackson.databind.node.ObjectNode"
+ }
+ }
+ }
+}
\ No newline at end of file
[5/9] incubator-streams git commit: useful to have this metadata
downstream
Posted by sb...@apache.org.
useful to have this metadata downstream
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/19874278
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/19874278
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/19874278
Branch: refs/heads/STREAMS-222
Commit: 1987427800b0cf90c197342ce522bab4cd72d605
Parents: dbb6952
Author: sblackmon <sb...@apache.org>
Authored: Thu Nov 20 17:18:42 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Thu Nov 20 17:18:42 2014 -0600
----------------------------------------------------------------------
.../datasift/provider/DatasiftPushProvider.java | 23 ++++++++++++++++++++
1 file changed, 23 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/19874278/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
index 2ab949f..4d5bff3 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
@@ -48,6 +50,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.math.BigInteger;
import java.util.List;
+import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReadWriteLock;
@@ -171,6 +174,26 @@ public class DatasiftPushProvider implements StreamsProvider {
String json = mapper.writeValueAsString(item);
StreamsDatum datum = new StreamsDatum(json);
+ if( item.getInteraction() != null &&
+ !Strings.isNullOrEmpty(item.getInteraction().getId())) {
+ datum.setId(item.getInteraction().getId());
+ }
+ if( item.getInteraction() != null &&
+ item.getInteraction().getCreatedAt() != null) {
+ datum.setTimestamp(item.getInteraction().getCreatedAt());
+ }
+ Map<String, Object> metadata = Maps.newHashMap();
+ metadata.put("datasift.hash", objectWrapper.getHash());
+ metadata.put("datasift.hashType", objectWrapper.getHashType());
+ metadata.put("datasift.id",objectWrapper.getId());
+
+ if( item.getInteraction() != null &&
+ item.getInteraction().getTags() != null &&
+ item.getInteraction().getTags().size() > 0) {
+ metadata.put("datasift.interaction.tags", item.getInteraction().getTags());
+ }
+
+ datum.setMetadata(metadata);
lock.writeLock().lock();
ComponentUtils.offerUntilSuccess(datum, providerQueue);
[2/9] incubator-streams git commit: Merge remote-tracking branch
'apache/master' into dropwizard
Posted by sb...@apache.org.
Merge remote-tracking branch 'apache/master' into dropwizard
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e6112906
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e6112906
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e6112906
Branch: refs/heads/STREAMS-222
Commit: e6112906772aa5909e09f7246005e0caef5afff5
Parents: a57c017 1b55741
Author: sblackmon <sb...@apache.org>
Authored: Wed Nov 19 16:26:18 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Wed Nov 19 16:26:18 2014 -0600
----------------------------------------------------------------------
.../serializer/FacebookActivityUtil.java | 2 +-
.../gplus/provider/AbstractGPlusProvider.java | 31 +++++++-----
.../gplus/provider/GPlusConfigurator.java | 32 ++++++------
.../provider/GPlusUserActivityProvider.java | 11 +++++
.../gplus/provider/GPlusUserDataProvider.java | 11 +++++
.../com/google/gplus/GPlusConfiguration.json | 17 +++----
.../provider/TestAbstractGPlusProvider.java | 7 ++-
.../tasks/BroadcastMonitorThread.java | 5 +-
.../org/apache/streams/pojo/json/Broadcast.json | 8 +++
.../local/builders/LocalStreamBuilder.java | 37 +++++++++++---
.../streams/local/builders/StreamComponent.java | 31 +++++++-----
.../local/counters/DatumStatusCounter.java | 9 +++-
.../local/counters/StreamsTaskCounter.java | 16 ++++--
.../streams/local/queues/ThroughputQueue.java | 52 +++++++++++++++++---
.../streams/local/tasks/BaseStreamsTask.java | 41 ++++++++++++++-
.../streams/local/tasks/StreamsMergeTask.java | 7 ++-
.../local/tasks/StreamsPersistWriterTask.java | 11 +++--
.../local/tasks/StreamsProcessorTask.java | 14 ++++--
.../local/tasks/StreamsProviderTask.java | 11 +++--
.../local/builders/LocalStreamBuilderTest.java | 9 ++--
.../local/counters/DatumStatusCounterTest.java | 22 ++++-----
.../local/counters/StreamsTaskCounterTest.java | 21 ++++----
.../queues/ThroughputQueueMulitThreadTest.java | 5 +-
.../queues/ThroughputQueueSingleThreadTest.java | 15 +++---
.../streams/local/tasks/BasicTasksTest.java | 6 +--
.../local/tasks/StreamsProviderTaskTest.java | 10 ++--
.../streams/storm/trident/StreamsTopology.java | 2 +-
27 files changed, 315 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
[8/9] incubator-streams git commit: more tests javadoc headers
refactor DatasiftPushProvider for compatibility
Posted by sb...@apache.org.
more tests
javadoc headers
refactor DatasiftPushProvider for compatibility
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/3d5f291a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/3d5f291a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/3d5f291a
Branch: refs/heads/STREAMS-222
Commit: 3d5f291a5f66f78d8728db0baf5ab6103ada42bf
Parents: 6c989cb
Author: sblackmon <sb...@apache.org>
Authored: Fri Nov 21 13:26:43 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Fri Nov 21 13:26:43 2014 -0600
----------------------------------------------------------------------
pom.xml | 7 +-
.../datasift/provider/DatasiftPushProvider.java | 3 +-
.../streams-runtime-dropwizard/pom.xml | 28 +++++--
.../dropwizard/GenericWebhookResource.java | 82 +++++++++++++-------
.../dropwizard/StreamDropwizardBuilder.java | 4 +-
.../streams/dropwizard/StreamsApplication.java | 19 ++++-
.../dropwizard/StreamsDropwizardModule.java | 5 +-
.../test/GenericWebhookResourceTest.java | 2 +-
.../src/test/resources/configuration.yml | 13 +---
9 files changed, 108 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 351ec68..317861f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,6 +77,7 @@
<surefire.plugin.version>2.17</surefire.plugin.version>
<failsafe.plugin.version>2.17</failsafe.plugin.version>
<slf4j.version>1.7.6</slf4j.version>
+ <hamcrest.version>1.3</hamcrest.version>
<logback.version>1.1.1</logback.version>
<commons-io.version>2.4</commons-io.version>
<commons-lang3.version>3.1</commons-lang3.version>
@@ -260,7 +261,11 @@
<artifactId>config</artifactId>
<version>${typesafe.config.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>${hamcrest.version}</version>
+ </dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
index a363cb1..cce5930 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
@@ -29,7 +29,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResource;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.datasift.Datasift;
import org.apache.streams.datasift.DatasiftConfiguration;
@@ -67,7 +66,7 @@ import java.util.regex.Pattern;
@Path("/streams/webhooks/datasift")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
-public class DatasiftPushProvider implements StreamsProvider, StreamsResource {
+public class DatasiftPushProvider implements StreamsProvider {
private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftPushProvider.class);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/pom.xml b/streams-runtimes/streams-runtime-dropwizard/pom.xml
index 045fa86..c18ba9d 100644
--- a/streams-runtimes/streams-runtime-dropwizard/pom.xml
+++ b/streams-runtimes/streams-runtime-dropwizard/pom.xml
@@ -69,6 +69,19 @@
</dependency>
<dependency>
+ <groupId>io.dropwizard</groupId>
+ <artifactId>dropwizard-metrics</artifactId>
+ <version>${dropwizard.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.dropwizard</groupId>
+ <artifactId>dropwizard-testing</artifactId>
+ <version>${dropwizard.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
@@ -152,12 +165,6 @@
<artifactId>log4j-over-slf4j</artifactId>
</dependency>
- <!-- This ensures slf4j-log4j12 is not packaged in implementations -->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <scope>provided</scope>
- </dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
@@ -238,6 +245,15 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ </plugin>
+ <!--<plugin>-->
+ <!--<groupId>org.apache.maven.plugins</groupId>-->
+ <!--<artifactId>maven-resources-plugin</artifactId>-->
+ <!--<resources>test.</resources>-->
+ <!--</plugin>-->
</plugins>
</build>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
index 56d57b0..8416361 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
@@ -6,7 +6,6 @@ import com.google.common.base.Splitter;
import com.google.common.collect.Queues;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResource;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.util.ComponentUtils;
@@ -31,11 +30,17 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
+/**
+ * GenericWebhookResource provides basic webhook connectivity.
+ *
+ * Add processors / persistWriters that read from "GenericWebhookResource" to
+ * consume data posted to streams.
+ */
@Resource
@Path("/streams/webhooks")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
-public class GenericWebhookResource implements StreamsProvider, StreamsResource {
+public class GenericWebhookResource implements StreamsProvider {
public GenericWebhookResource() {
}
@@ -57,19 +62,35 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
String body) {
ObjectNode response = mapper.createObjectNode();
+ int responseCode = Response.Status.BAD_REQUEST.getStatusCode();
- StreamsDatum datum = new StreamsDatum(body);
+ try {
+ ObjectNode item = mapper.readValue(body, ObjectNode.class);
- lock.writeLock().lock();
- ComponentUtils.offerUntilSuccess(datum, providerQueue);
- lock.writeLock().unlock();
+ StreamsDatum datum = new StreamsDatum(body);
+
+ lock.writeLock().lock();
+ ComponentUtils.offerUntilSuccess(datum, providerQueue);
+ lock.writeLock().unlock();
- Boolean success = true;
+ Boolean success = true;
- response.put("success", success);
+ response.put("success", success);
- return Response.status(200).entity(response).build();
+ responseCode = Response.Status.OK.getStatusCode();
+ } catch (Exception e) {
+ log.warn(e.toString(), e);
+
+ Boolean success = false;
+
+ response.put("success", success);
+ responseCode = Response.Status.BAD_REQUEST.getStatusCode();
+
+ } finally {
+ return Response.status(responseCode).entity(response).build();
+
+ }
}
@POST
@@ -78,19 +99,22 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
String body) {
ObjectNode response = mapper.createObjectNode();
+ int responseCode = Response.Status.BAD_REQUEST.getStatusCode();
if (body.equalsIgnoreCase("{}")) {
Boolean success = true;
response.put("success", success);
-
- return Response.status(200).entity(response).build();
+ responseCode = Response.Status.OK.getStatusCode();
+ return Response.status(responseCode).entity(response).build();
}
try {
- for( String item : Splitter.on(newLinePattern).split(body)) {
+ for( String line : Splitter.on(newLinePattern).split(body)) {
+ ObjectNode item = mapper.readValue(line, ObjectNode.class);
+
StreamsDatum datum = new StreamsDatum(item);
lock.writeLock().lock();
@@ -102,8 +126,7 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
Boolean success = true;
response.put("success", success);
-
- return Response.status(200).entity(response).build();
+ responseCode = Response.Status.OK.getStatusCode();
} catch (Exception e) {
log.warn(e.toString(), e);
@@ -111,8 +134,10 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
Boolean success = false;
response.put("success", success);
+ responseCode = Response.Status.BAD_REQUEST.getStatusCode();
- return Response.status(500).entity(response).build();
+ } finally {
+ return Response.status(responseCode).entity(response).build();
}
@@ -123,19 +148,17 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
public Response json_meta(@Context HttpHeaders headers,
String body) {
- //log.debug(headers.toString(), headers);
-
- //log.debug(body.toString(), body);
-
ObjectNode response = mapper.createObjectNode();
+ int responseCode = Response.Status.BAD_REQUEST.getStatusCode();
if (body.equalsIgnoreCase("{}")) {
Boolean success = true;
response.put("success", success);
+ responseCode = Response.Status.OK.getStatusCode();
- return Response.status(200).entity(response).build();
+ return Response.status(responseCode).entity(response).build();
}
try {
@@ -144,9 +167,7 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
for( ObjectNode item : objectWrapper.getData()) {
- String json = mapper.writeValueAsString(item);
-
- StreamsDatum datum = new StreamsDatum(json);
+ StreamsDatum datum = new StreamsDatum(item);
lock.writeLock().lock();
ComponentUtils.offerUntilSuccess(datum, providerQueue);
@@ -156,18 +177,19 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
Boolean success = true;
response.put("success", success);
-
- return Response.status(200).entity(response).build();
+ responseCode = Response.Status.OK.getStatusCode();
} catch (Exception e) {
log.warn(e.toString(), e);
- }
- return Response.status(500).build();
- }
+ Boolean success = false;
+
+ response.put("success", success);
+ responseCode = Response.Status.BAD_REQUEST.getStatusCode();
+ } finally {
+ return Response.status(responseCode).entity(response).build();
+ }
- public List<ObjectNode> getData(GenericWebhookData wrapper) {
- return wrapper.getData();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
index 4292900..524009a 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
@@ -11,7 +11,9 @@ import java.math.BigInteger;
import java.util.Map;
/**
- * Created by sblackmon on 11/20/14.
+ * StreamDropwizardBuilder is currently a light wrapper around LocalStreamBuilder
+ *
+ * It's a seperate class because they will almost certainly deviate going forward
*/
public class StreamDropwizardBuilder extends LocalStreamBuilder implements StreamBuilder {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
index 67d0446..fce9852 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
@@ -1,5 +1,7 @@
package org.apache.streams.dropwizard;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.guava.GuavaModule;
@@ -14,6 +16,7 @@ import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigRenderOptions;
import io.dropwizard.Application;
import io.dropwizard.jackson.GuavaExtrasModule;
+import io.dropwizard.metrics.MetricsFactory;
import io.dropwizard.setup.Bootstrap;
import io.dropwizard.setup.Environment;
import org.apache.streams.config.StreamsConfiguration;
@@ -47,12 +50,19 @@ import com.google.inject.Inject;
import javax.annotation.Resource;
import javax.ws.rs.Path;
+/**
+ * Entry point to a dropwizard streams application
+ *
+ * It will start up a stream in the local runtime, as well as bind any
+ * StreamsProvider on the classpath with a @Resource annotation.
+ *
+ */
public class StreamsApplication extends Application<StreamsDropwizardConfiguration> {
private static final Logger LOGGER = LoggerFactory
.getLogger(StreamsApplication.class);
- private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ protected static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
protected StreamBuilder builder;
@@ -95,6 +105,10 @@ public class StreamsApplication extends Application<StreamsDropwizardConfigurati
resourceProviders.add(provider);
}
+ MetricRegistry metrics = new MetricRegistry();
+ MetricsFactory mfac = streamsDropwizardConfiguration.getMetricsFactory();
+ mfac.configure(environment.lifecycle(), metrics);
+
streamsConfiguration = mapper.convertValue(streamsDropwizardConfiguration, StreamsConfiguration.class);
builder = setup(streamsConfiguration, resourceProviders);
@@ -115,7 +129,8 @@ public class StreamsApplication extends Application<StreamsDropwizardConfigurati
Map<String, Object> streamConfig = Maps.newHashMap();
streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 20 * 60 * 1000 * 1000);
- if(! Strings.isNullOrEmpty(streamsConfiguration.getBroadcastURI()) ) streamConfig.put("broadcastURI", streamsConfiguration.getBroadcastURI());
+ //if(! Strings.isNullOrEmpty(streamsConfiguration.getBroadcastURI()) ) streamConfig.put("broadcastURI", streamsConfiguration.getBroadcastURI());
+ streamConfig.put("monitoring_broadcast_interval_ms", -1);
StreamBuilder builder = new StreamDropwizardBuilder(1000, streamConfig);
List<String> providers = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
index f5cd020..1ba07c0 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
@@ -13,7 +13,10 @@ import org.apache.streams.config.StreamsConfigurator;
import java.io.IOException;
/**
- * Created by sblackmon on 11/18/14.
+ * This class exists because dropwizard-guice requires at least
+ * one module to run
+ *
+ * Do not expect @Inject StreamsConfiguration to work at the moment.
*/
public class StreamsDropwizardModule extends AbstractModule {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java b/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java
index d1d02ac..db9133b 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java
@@ -29,7 +29,7 @@ import java.util.List;
import static org.mockito.Mockito.*;
/**
- * Created by sblackmon on 11/21/14.
+ * Tests {@link: org.apache.streams.dropwizard.GenericWebhookResource}
*/
public class GenericWebhookResourceTest {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml b/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml
index e2943cd..778f50a 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml
+++ b/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml
@@ -1,5 +1,5 @@
template: Hello, %s!
-defaultName: datasift
+defaultName: streams
server:
type: simple
@@ -7,7 +7,7 @@ server:
adminContextPath: /admin
connector:
type: http
- port: 8000
+ port: 8003
logging:
level: DEBUG
@@ -16,12 +16,3 @@ logging:
threshold: ALL
target: stdout
-elasticsearch:
- hosts:
- - "localhost"
- port: 9300
- clusterName: elasticsearch
- index: datasift_webhook
- type: activity
- batchSize: 100
-
[6/9] incubator-streams git commit: useful to have this metadata
downstream
Posted by sb...@apache.org.
useful to have this metadata downstream
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/069969d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/069969d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/069969d7
Branch: refs/heads/STREAMS-222
Commit: 069969d7f9496fbadc18a847648eead27396518e
Parents: 1987427
Author: sblackmon <sb...@apache.org>
Authored: Thu Nov 20 17:19:39 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Thu Nov 20 17:19:39 2014 -0600
----------------------------------------------------------------------
.../streams/datasift/provider/DatasiftPushProvider.java | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/069969d7/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
index 4d5bff3..a363cb1 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResource;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.datasift.Datasift;
import org.apache.streams.datasift.DatasiftConfiguration;
@@ -66,7 +67,7 @@ import java.util.regex.Pattern;
@Path("/streams/webhooks/datasift")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
-public class DatasiftPushProvider implements StreamsProvider {
+public class DatasiftPushProvider implements StreamsProvider, StreamsResource {
private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftPushProvider.class);
@@ -183,14 +184,14 @@ public class DatasiftPushProvider implements StreamsProvider {
datum.setTimestamp(item.getInteraction().getCreatedAt());
}
Map<String, Object> metadata = Maps.newHashMap();
- metadata.put("datasift.hash", objectWrapper.getHash());
- metadata.put("datasift.hashType", objectWrapper.getHashType());
- metadata.put("datasift.id",objectWrapper.getId());
+ metadata.put("hash", objectWrapper.getHash());
+ metadata.put("hashType", objectWrapper.getHashType());
+ metadata.put("id",objectWrapper.getId());
if( item.getInteraction() != null &&
item.getInteraction().getTags() != null &&
item.getInteraction().getTags().size() > 0) {
- metadata.put("datasift.interaction.tags", item.getInteraction().getTags());
+ metadata.put("tags", item.getInteraction().getTags());
}
datum.setMetadata(metadata);
[4/9] incubator-streams git commit: refactored DatasiftProvider to
Resource
Posted by sb...@apache.org.
refactored DatasiftProvider to Resource
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/dbb69526
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/dbb69526
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/dbb69526
Branch: refs/heads/STREAMS-222
Commit: dbb69526e7469f6b988e9ed35793616d988102f6
Parents: dbda9ed
Author: sblackmon <sb...@apache.org>
Authored: Thu Nov 20 15:30:15 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Thu Nov 20 15:30:15 2014 -0600
----------------------------------------------------------------------
.../datasift/provider/DatasiftPushProvider.java | 196 ++++++++++++++-----
.../streams/datasift/DatasiftWebhookData.json | 30 +++
2 files changed, 180 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbb69526/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
index 264dbbe..2ab949f 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
@@ -20,66 +20,193 @@ package org.apache.streams.datasift.provider;
import com.datasift.client.stream.DeletedInteraction;
import com.datasift.client.stream.StreamEventListener;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
import com.google.common.collect.Queues;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.datasift.Datasift;
import org.apache.streams.datasift.DatasiftConfiguration;
+import org.apache.streams.datasift.DatasiftWebhookData;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Resource;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
import java.math.BigInteger;
+import java.util.List;
import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
/**
* Requires Java Version 1.7!
* {@code DatasiftStreamProvider} is an implementation of the {@link org.apache.streams.core.StreamsProvider} interface. The provider
* uses the Datasift java api to make connections. A single provider creates one connection per StreamHash in the configuration.
*/
+@Resource
+@Path("/streams/webhooks/datasift")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
public class DatasiftPushProvider implements StreamsProvider {
private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftPushProvider.class);
- private DatasiftConfiguration config;
- protected Queue<StreamsDatum> providerQueue;
+ private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ protected Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>();
protected final ReadWriteLock lock = new ReentrantReadWriteLock();
- public DatasiftPushProvider() {
+ private static Pattern newLinePattern = Pattern.compile("(\\r\\n?|\\n)", Pattern.MULTILINE);
+
+ @POST
+ @Path("json")
+ public Response json(@Context HttpHeaders headers,
+ String body) {
+
+ ObjectNode response = mapper.createObjectNode();
+
+ StreamsDatum datum = new StreamsDatum(body);
+
+ lock.writeLock().lock();
+ ComponentUtils.offerUntilSuccess(datum, providerQueue);
+ lock.writeLock().unlock();
+
+ Boolean success = true;
+
+ response.put("success", success);
+
+ return Response.status(200).entity(response).build();
}
- @Override
- public void startStream() {
- Preconditions.checkNotNull(providerQueue);
+ @POST
+ @Path("json_new_line")
+ public Response json_new_line(@Context HttpHeaders headers,
+ String body) {
+
+ ObjectNode response = mapper.createObjectNode();
+
+ if (body.equalsIgnoreCase("{}")) {
+
+ Boolean success = true;
+
+ response.put("success", success);
+
+ return Response.status(200).entity(response).build();
+ }
+
+ try {
+
+ for( String item : Splitter.on(newLinePattern).split(body)) {
+ StreamsDatum datum = new StreamsDatum(item);
+
+ lock.writeLock().lock();
+ ComponentUtils.offerUntilSuccess(datum, providerQueue);
+ lock.writeLock().unlock();
+
+ }
+
+ Boolean success = true;
+
+ response.put("success", success);
+
+ return Response.status(200).entity(response).build();
+
+ } catch (Exception e) {
+ LOGGER.warn(e.toString(), e);
+
+ Boolean success = false;
+
+ response.put("success", success);
+
+ return Response.status(500).entity(response).build();
+
+ }
+
}
- /**
- * Shuts down all open streams from datasift.
- */
- public void stop() {
+ @POST
+ @Path("json_meta")
+ public Response json_meta(@Context HttpHeaders headers,
+ String body) {
+
+ //log.debug(headers.toString(), headers);
+
+ //log.debug(body.toString(), body);
+
+ ObjectNode response = mapper.createObjectNode();
+
+ if (body.equalsIgnoreCase("{}")) {
+
+ Boolean success = true;
+
+ response.put("success", success);
+
+ return Response.status(200).entity(response).build();
+ }
+
+ try {
+
+ DatasiftWebhookData objectWrapper = mapper.readValue(body, DatasiftWebhookData.class);
+
+ for( Datasift item : objectWrapper.getInteractions()) {
+
+ String json = mapper.writeValueAsString(item);
+
+ StreamsDatum datum = new StreamsDatum(json);
+
+ lock.writeLock().lock();
+ ComponentUtils.offerUntilSuccess(datum, providerQueue);
+ lock.writeLock().unlock();
+ }
+
+ Boolean success = true;
+
+ response.put("success", success);
+
+ return Response.status(200).entity(response).build();
+
+ } catch (Exception e) {
+ LOGGER.warn(e.toString(), e);
+ }
+
+ return Response.status(500).build();
+ }
+
+ @Override
+ public void startStream() {
+ return;
}
- // PRIME EXAMPLE OF WHY WE NEED NEW INTERFACES FOR PROVIDERS
@Override
- //This is a hack. It is only like this because of how perpetual streams work at the moment. Read list server to debate/vote for new interfaces.
public StreamsResultSet readCurrent() {
- Queue<StreamsDatum> datums = Queues.newConcurrentLinkedQueue();
- StreamsResultSet current = new StreamsResultSet(datums);
- try {
- lock.writeLock().lock();
- current = new StreamsResultSet(providerQueue);
- providerQueue = constructQueue();
- } finally {
- lock.writeLock().unlock();
- }
+ StreamsResultSet current;
+
+ lock.writeLock().lock();
+ current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+ providerQueue.clear();
+ lock.writeLock().unlock();
return current;
+
}
@Override
@@ -87,6 +214,7 @@ public class DatasiftPushProvider implements StreamsProvider {
return null;
}
+ @Override
public StreamsResultSet readRange(DateTime start, DateTime end) {
return null;
}
@@ -98,36 +226,12 @@ public class DatasiftPushProvider implements StreamsProvider {
@Override
public void prepare(Object configurationObject) {
- this.providerQueue = constructQueue();
+
}
@Override
public void cleanUp() {
- stop();
- }
- public DatasiftConfiguration getConfig() {
- return config;
- }
-
- public void setConfig(DatasiftConfiguration config) {
- this.config = config;
- }
-
- private Queue<StreamsDatum> constructQueue() {
- return Queues.newConcurrentLinkedQueue();
- }
-
- /**
- * THIS CLASS NEEDS TO BE REPLACED/OVERRIDDEN BY ALL USERS. TWITTERS TERMS OF SERVICE SAYS THAT EVERYONE MUST
- * DELETE TWEETS FROM THEIR DATA STORE IF THEY RECEIVE A DELETE NOTICE.
- */
- public static class DeleteHandler extends StreamEventListener {
-
- public void onDelete(DeletedInteraction di) {
- //go off and delete the interaction if you have it stored. This is a strict requirement!
- LOGGER.info("DELETED:\n " + di);
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbb69526/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftWebhookData.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftWebhookData.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftWebhookData.json
new file mode 100644
index 0000000..c4e43e8
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftWebhookData.json
@@ -0,0 +1,30 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "javaType": "org.apache.streams.datasift.DatasiftWebhookData",
+ "properties": {
+ "id": {
+ "type": "string"
+ },
+ "hash": {
+ "type": "string"
+ },
+ "hash_type": {
+ "type": "string"
+ },
+ "count": {
+ "type": "long"
+ },
+ "delivered_at": {
+ "type": "string",
+ "format": "date-time"
+ },
+ "interactions": {
+ "type": "array",
+ "items": {
+ "type": "object",
+ "javaType": "org.apache.streams.datasift.Datasift"
+ }
+ }
+ }
+}
\ No newline at end of file