You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/21 20:35:08 UTC

[1/9] incubator-streams git commit: drop wizard runtime

Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-222 [created] 149eb2310


drop wizard runtime


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a57c0172
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a57c0172
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a57c0172

Branch: refs/heads/STREAMS-222
Commit: a57c017211a499372fda1a511b6a7db063360f04
Parents: 91dd9a3
Author: sblackmon <sb...@apache.org>
Authored: Wed Nov 19 12:54:57 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Wed Nov 19 12:54:57 2014 -0600

----------------------------------------------------------------------
 streams-runtimes/pom.xml                        |   1 +
 .../streams-runtime-dropwizard/pom.xml          | 248 +++++++++++++++++++
 .../streams/dropwizard/StreamsApplication.java  | 112 +++++++++
 .../dropwizard/StreamsDropwizardModule.java     |  54 ++++
 .../streams/dropwizard/WebhookResource.java     | 222 +++++++++++++++++
 .../StreamsDropwizardConfiguration.json         |   8 +
 6 files changed, 645 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a57c0172/streams-runtimes/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/pom.xml b/streams-runtimes/pom.xml
index 5d43c28..7f9f68a 100644
--- a/streams-runtimes/pom.xml
+++ b/streams-runtimes/pom.xml
@@ -33,6 +33,7 @@
     <packaging>pom</packaging>
 
     <modules>
+        <module>streams-runtime-dropwizard</module>
         <module>streams-runtime-local</module>
         <module>streams-runtime-pig</module>
         <module>streams-runtime-storm</module>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a57c0172/streams-runtimes/streams-runtime-dropwizard/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/pom.xml b/streams-runtimes/streams-runtime-dropwizard/pom.xml
new file mode 100644
index 0000000..ab6e927
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/pom.xml
@@ -0,0 +1,248 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.streams</groupId>
+        <artifactId>streams-runtimes</artifactId>
+        <version>0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>streams-runtime-dropwizard</artifactId>
+
+    <properties>
+        <dropwizard.version>0.7.1</dropwizard.version>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>io.dropwizard</groupId>
+            <artifactId>dropwizard-core</artifactId>
+            <version>${dropwizard.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-annotations</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.module</groupId>
+                    <artifactId>jackson-module-afterburner</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.datatype</groupId>
+                    <artifactId>jackson-datatype-guava</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>io.dropwizard</groupId>
+            <artifactId>dropwizard-configuration</artifactId>
+            <version>${dropwizard.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.dropwizard</groupId>
+            <artifactId>dropwizard-validation</artifactId>
+            <version>${dropwizard.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.module</groupId>
+            <artifactId>jackson-module-afterburner</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.datatype</groupId>
+            <artifactId>jackson-datatype-guava</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.hubspot.dropwizard</groupId>
+            <artifactId>dropwizard-guice</artifactId>
+            <version>0.7.1</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.inject</groupId>
+            <artifactId>guice</artifactId>
+            <version>3.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.jsonschema2pojo</groupId>
+            <artifactId>jsonschema2pojo-core</artifactId>
+            <type>jar</type>
+            <scope>compile</scope>
+            <exclusions>
+                <exclusion>
+                <groupId>javax.validation</groupId>
+                <artifactId>validation-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-util</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-runtime-local</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-persist-console</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+        </dependency>
+
+        <!-- This ensures slf4j-log4j12 is not packaged in implementations -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>1.9.5</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/jsonschema2pojo</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.jsonschema2pojo</groupId>
+                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+                <configuration>
+                    <addCompileSourceRoot>true</addCompileSourceRoot>
+                    <generateBuilders>true</generateBuilders>
+                    <sourcePaths>
+                        <sourcePath>src/main/jsonschema</sourcePath>
+                    </sourcePaths>
+                    <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+                    <targetPackage>org.apache.streams.dropwizard</targetPackage>
+                    <useLongIntegers>true</useLongIntegers>
+                    <useJodaDates>true</useJodaDates>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a57c0172/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
new file mode 100644
index 0000000..733b078
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
@@ -0,0 +1,112 @@
+package org.apache.streams.dropwizard;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.guava.GuavaModule;
+import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+import com.hubspot.dropwizard.guice.GuiceBundle;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigRenderOptions;
+import io.dropwizard.Application;
+import io.dropwizard.jackson.GuavaExtrasModule;
+import io.dropwizard.setup.Bootstrap;
+import io.dropwizard.setup.Environment;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.console.ConsolePersistWriter;
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class StreamsApplication extends Application<StreamsDropwizardConfiguration> {
+
+    private static final Logger LOGGER = LoggerFactory
+			.getLogger(StreamsApplication.class);
+
+    private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    private StreamBuilder builder;
+
+    private WebhookResource webhook;
+
+    private String broadcastURI;
+
+    private Executor executor = Executors.newSingleThreadExecutor();
+
+    static {
+        mapper.registerModule(new AfterburnerModule());
+        mapper.registerModule(new GuavaModule());
+        mapper.registerModule(new GuavaExtrasModule());
+    }
+
+    @Override
+    public void initialize(Bootstrap<StreamsDropwizardConfiguration> bootstrap) {
+
+        LOGGER.info(getClass().getPackage().getName());
+
+        GuiceBundle<StreamsDropwizardConfiguration> guiceBundle =
+                GuiceBundle.<StreamsDropwizardConfiguration>newBuilder()
+                .addModule(new StreamsDropwizardModule())
+                .setConfigClass(StreamsDropwizardConfiguration.class)
+                // override and add more packages to pick up custom Resources
+                .enableAutoConfig(getClass().getPackage().getName())
+                .build();
+        bootstrap.addBundle(guiceBundle);
+
+    }
+
+    @Override
+    public void run(StreamsDropwizardConfiguration streamsDropwizardConfiguration, Environment environment) throws Exception {
+
+        webhook = new WebhookResource();
+
+        executor = Executors.newSingleThreadExecutor();
+
+        executor.execute(new StreamsDropwizardRunner());
+
+        // wait for streams to start up
+        Thread.sleep(10000);
+
+        //environment.jersey().register(webhook);
+
+    }
+
+    private class StreamsDropwizardRunner implements Runnable {
+
+        @Override
+        public void run() {
+
+            Map<String, Object> streamConfig = Maps.newHashMap();
+            streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 20 * 60 * 1000 * 1000);
+            if(! Strings.isNullOrEmpty(broadcastURI) ) streamConfig.put("broadcastURI", broadcastURI);
+            builder = new LocalStreamBuilder(1000, streamConfig);
+
+            // prepare stream components
+            builder.newPerpetualStream("webhooks", webhook);
+
+            builder.addStreamsPersistWriter("console", new ConsolePersistWriter(), 1, "webhooks");
+        }
+    }
+
+
+    public static void main(String[] args) throws Exception
+    {
+
+        StreamsApplication application = new StreamsApplication();
+        if( args.length == 1 ) application.broadcastURI = args[0];
+        application.run(args);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a57c0172/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
new file mode 100644
index 0000000..4264dbb
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
@@ -0,0 +1,54 @@
+package org.apache.streams.dropwizard;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigRenderOptions;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+
+import java.io.IOException;
+
+/**
+ * Created by sblackmon on 11/18/14.
+ */
+public class StreamsDropwizardModule extends AbstractModule {
+
+    @Override
+    protected void configure() {
+        // anything you'd like to configure
+    }
+
+    @Provides
+    public StreamsConfiguration providesStreamsConfiguration(StreamsDropwizardConfiguration configuration) {
+        return StreamsConfigurator.detectConfiguration();
+    }
+
+//    private StreamsDropwizardConfiguration reconfigure(StreamsDropwizardConfiguration streamsConfiguration) {
+//
+//        // config from dropwizard
+//        Config configDropwizard = null;
+//        try {
+//            configDropwizard = ConfigFactory.parseString(mapper.writeValueAsString(streamsConfiguration));
+//        } catch (JsonProcessingException e) {
+//            e.printStackTrace();
+//            LOGGER.error("Invalid Configuration: " + streamsConfiguration);
+//        }
+//
+//        Config combinedConfig = configTypesafe.withFallback(configDropwizard);
+//        String combinedConfigJson = combinedConfig.root().render(ConfigRenderOptions.concise());
+//
+//        StreamsDropwizardConfiguration combinedDropwizardConfig = null;
+//        try {
+//            combinedDropwizardConfig = mapper.readValue(combinedConfigJson, StreamsDropwizardConfiguration.class);
+//        } catch (IOException e) {
+//            e.printStackTrace();
+//            LOGGER.error("Invalid Configuration after merge: " + streamsConfiguration);
+//        }
+//
+//        return  combinedDropwizardConfig;
+//
+//    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a57c0172/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/WebhookResource.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/WebhookResource.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/WebhookResource.java
new file mode 100644
index 0000000..1f80c5c
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/WebhookResource.java
@@ -0,0 +1,222 @@
+package org.apache.streams.dropwizard;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Queues;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.ComponentUtils;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Resource;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.math.BigInteger;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
+
+@Resource
+@Path("/streams/webhooks")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+public class WebhookResource implements StreamsProvider {
+
+    public WebhookResource() {
+    }
+
+    private static final Logger log = LoggerFactory
+            .getLogger(WebhookResource.class);
+
+    private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    protected Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>();
+
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    private static Pattern newLinePattern = Pattern.compile("(\\r\\n?|\\n)", Pattern.MULTILINE);
+
+    @POST
+    @Path("json")
+    public Response json(@Context HttpHeaders headers,
+                                  String body) {
+
+        ObjectNode response = mapper.createObjectNode();
+
+        StreamsDatum datum = new StreamsDatum(body);
+
+        lock.writeLock().lock();
+        ComponentUtils.offerUntilSuccess(datum, providerQueue);
+        lock.writeLock().unlock();
+
+        Boolean success = true;
+
+        response.put("success", success);
+
+        return Response.status(200).entity(response).build();
+
+    }
+
+    @POST
+    @Path("json_new_line")
+    public Response json_new_line(@Context HttpHeaders headers,
+                                           String body) {
+
+        ObjectNode response = mapper.createObjectNode();
+
+        if (body.equalsIgnoreCase("{}")) {
+
+            Boolean success = true;
+
+            response.put("success", success);
+
+            return Response.status(200).entity(response).build();
+        }
+
+        try {
+
+            for( String item : Splitter.on('\n').split(body)) {
+                StreamsDatum datum = new StreamsDatum(item);
+
+                lock.writeLock().lock();
+                ComponentUtils.offerUntilSuccess(datum, providerQueue);
+                lock.writeLock().unlock();
+
+            }
+
+            Boolean success = true;
+
+            response.put("success", success);
+
+            return Response.status(200).entity(response).build();
+
+        } catch (Exception e) {
+            log.warn(e.toString(), e);
+
+            Boolean success = false;
+
+            response.put("success", success);
+
+            return Response.status(500).entity(response).build();
+
+        }
+
+    }
+
+//    @POST
+//    @Path("json_meta")
+//    public Response json_meta(@Context HttpHeaders headers,
+//                                       String body) {
+//
+//        //log.debug(headers.toString(), headers);
+//
+//        //log.debug(body.toString(), body);
+//
+//        ObjectNode response = mapper.createObjectNode();
+//
+//        if (body.equalsIgnoreCase("{}")) {
+//
+//            Boolean success = true;
+//
+//            response.put("success", success);
+//
+//            return Response.status(200).entity(response).build();
+//        }
+//
+//        try {
+//
+//            ObjectNode objectWrapper = mapper.readValue(body, ObjectNode.class);
+//
+//            for( ObjectNode item : objectWrapper.getData()) {
+//
+//                String json = mapper.writeValueAsString(item);
+//
+//                StreamsDatum datum = new StreamsDatum(json, item.getInteraction().getId(), item.getInteraction().getCreatedAt());
+//
+//                lock.writeLock().lock();
+//                ComponentUtils.offerUntilSuccess(datum, providerQueue);
+//                lock.writeLock().unlock();
+//            }
+//
+//            log.info("interactionQueue: " + providerQueue.size());
+//
+//            Boolean success = true;
+//
+//            response.put("success", success);
+//
+//            return Response.status(200).entity(response).build();
+//
+//        } catch (Exception e) {
+//            log.warn(e.toString(), e);
+//        }
+//
+//        return Response.status(500).build();
+//    }
+
+    @Override
+    public void startStream() {
+        return;
+    }
+
+    @Override
+    public StreamsResultSet readCurrent() {
+
+        StreamsResultSet current;
+
+        lock.writeLock().lock();
+        current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+        providerQueue.clear();
+        lock.writeLock().unlock();
+
+        return current;
+
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return false;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+
+    public void addDatum(StreamsDatum datum) {
+        try {
+            lock.readLock().lock();
+            ComponentUtils.offerUntilSuccess(datum, providerQueue);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a57c0172/streams-runtimes/streams-runtime-dropwizard/src/main/jsonschema/org/apache/streams/dropwizard/StreamsDropwizardConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/jsonschema/org/apache/streams/dropwizard/StreamsDropwizardConfiguration.json b/streams-runtimes/streams-runtime-dropwizard/src/main/jsonschema/org/apache/streams/dropwizard/StreamsDropwizardConfiguration.json
new file mode 100644
index 0000000..9faee44
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/jsonschema/org/apache/streams/dropwizard/StreamsDropwizardConfiguration.json
@@ -0,0 +1,8 @@
+{
+    "type": "object",
+    "javaType" : "org.apache.streams.dropwizard.StreamsDropwizardConfiguration",
+    "extends": {
+        "javaType": "io.dropwizard.Configuration",
+        "type": "object"
+    }
+}


[9/9] incubator-streams git commit: javadoc

Posted by sb...@apache.org.
javadoc


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/149eb231
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/149eb231
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/149eb231

Branch: refs/heads/STREAMS-222
Commit: 149eb23109ce4a77042c58f65ee79eee786ac9f3
Parents: 3d5f291
Author: sblackmon <sb...@apache.org>
Authored: Fri Nov 21 13:30:44 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Fri Nov 21 13:30:44 2014 -0600

----------------------------------------------------------------------
 .../streams/datasift/provider/DatasiftPushProvider.java      | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/149eb231/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
index cce5930..dae48ac 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
@@ -58,9 +58,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Pattern;
 
 /**
- * Requires Java Version 1.7!
- * {@code DatasiftStreamProvider} is an implementation of the {@link org.apache.streams.core.StreamsProvider} interface.  The provider
- * uses the Datasift java api to make connections. A single provider creates one connection per StreamHash in the configuration.
+ * {@code DatasiftPushProvider} is an implementation of the {@link org.apache.streams.core.StreamsProvider} interface, with
+ * annotations that allow it to bind as jersey resources within streams-runtime-dropwizard.
+ *
+ * Whereas GenericWebhookResource outputs ObjectNode datums, DatasiftPushProvider outputs Datasift datums, with
+ * metadata when the json_meta endpoint is used.
  */
 @Resource
 @Path("/streams/webhooks/datasift")


[7/9] incubator-streams git commit: added tests decided not to add StreamsResource to core at this time

Posted by sb...@apache.org.
added tests
decided not to add StreamsResource to core at this time


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6c989cb6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6c989cb6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6c989cb6

Branch: refs/heads/STREAMS-222
Commit: 6c989cb69ea0e2013d1543e987ce6b7325e30859
Parents: 069969d
Author: sblackmon <sb...@apache.org>
Authored: Fri Nov 21 13:24:45 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Fri Nov 21 13:24:45 2014 -0600

----------------------------------------------------------------------
 .../apache/streams/core/StreamsResource.java    | 17 ----
 .../test/GenericWebhookResourceTest.java        | 82 ++++++++++++++++++++
 .../dropwizard/test/StreamsApplicationIT.java   | 30 +++++++
 .../dropwizard/test/TestStreamsApplication.java |  9 +++
 .../src/test/resources/configuration.yml        | 27 +++++++
 5 files changed, 148 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6c989cb6/streams-core/src/main/java/org/apache/streams/core/StreamsResource.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsResource.java b/streams-core/src/main/java/org/apache/streams/core/StreamsResource.java
deleted file mode 100644
index 4bd18e2..0000000
--- a/streams-core/src/main/java/org/apache/streams/core/StreamsResource.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package org.apache.streams.core;
-
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.Response;
-
-/**
- * Created by sblackmon on 11/20/14.
- */
-public interface StreamsResource {
-
-    public Response json(HttpHeaders headers, String body);
-
-    public Response json_new_line(HttpHeaders headers, String body);
-
-    public Response json_meta(HttpHeaders headers, String body);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6c989cb6/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java b/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java
new file mode 100644
index 0000000..d1d02ac
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java
@@ -0,0 +1,82 @@
+package org.apache.streams.dropwizard.test;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import com.google.common.io.Resources;
+import io.dropwizard.testing.junit.DropwizardAppRule;
+import io.dropwizard.testing.junit.ResourceTestRule;
+import org.apache.streams.dropwizard.GenericWebhookData;
+import org.apache.streams.dropwizard.GenericWebhookResource;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Person;
+import org.joda.time.DateTime;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.ws.rs.core.HttpHeaders;
+import javax.xml.ws.Response;
+
+import java.util.List;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * Created by sblackmon on 11/21/14.
+ */
+public class GenericWebhookResourceTest {
+
+    private static ObjectMapper mapper = new StreamsJacksonMapper();
+
+    private static final GenericWebhookResource genericWebhookResource = new GenericWebhookResource();
+
+    @ClassRule
+    public static final ResourceTestRule resources = ResourceTestRule.builder()
+            .addResource(genericWebhookResource)
+            .build();
+
+    @Test
+    public void testPostJson() {
+        Assert.assertEquals(400, genericWebhookResource.json(null, "{").getStatus());
+        Assert.assertEquals(400, genericWebhookResource.json(null, "}").getStatus());
+        Assert.assertEquals(400, genericWebhookResource.json(null, "srg").getStatus());
+        Assert.assertEquals(400, genericWebhookResource.json(null, "123").getStatus());
+        Assert.assertEquals(200, genericWebhookResource.json(null, "{}").getStatus());
+        Assert.assertEquals(200, genericWebhookResource.json(null, "{\"valid\":\"true\"}").getStatus());
+    };
+
+    @Test
+    public void testPostJsonNewLine() {
+        Assert.assertEquals(200, genericWebhookResource.json_new_line(null, "{}").getStatus());
+        Assert.assertEquals(400, genericWebhookResource.json_new_line(null, "notvalid").getStatus());
+        Assert.assertEquals(200, genericWebhookResource.json_new_line(null, "{\"valid\":\"true\"}").getStatus());
+        Assert.assertEquals(200, genericWebhookResource.json_new_line(null, "{\"valid\":\"true\"}\n{\"valid\":\"true\"}\r{\"valid\":\"true\"}").getStatus());
+    };
+
+    @Test
+    public void testPostJsonMeta() throws JsonProcessingException {
+        Assert.assertEquals(200, genericWebhookResource.json_meta(null, "{}").getStatus());
+        Assert.assertEquals(400, genericWebhookResource.json_meta(null, "notvalid").getStatus());
+        GenericWebhookData testPostJsonMeta = new GenericWebhookData()
+                .withHash("test")
+                .withDeliveredAt(DateTime.now())
+                .withCount(1)
+                .withHashType("type")
+                .withId("test");
+        List<ObjectNode> testPostJsonData = Lists.newArrayList();
+        testPostJsonData.add(mapper.createObjectNode().put("valid", "true"));
+        testPostJsonMeta.setData(testPostJsonData);
+        String testPostJsonEntity = mapper.writeValueAsString(testPostJsonMeta);
+        Assert.assertEquals(200, genericWebhookResource.json_meta(null, testPostJsonEntity).getStatus());
+
+    };
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6c989cb6/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/StreamsApplicationIT.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/StreamsApplicationIT.java b/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/StreamsApplicationIT.java
new file mode 100644
index 0000000..83b7b89
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/StreamsApplicationIT.java
@@ -0,0 +1,30 @@
+package org.apache.streams.dropwizard.test;
+
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.URL;
+
+/**
+ * Tests {@link: org.apache.streams.dropwizard.StreamsApplication}
+ */
+public class StreamsApplicationIT {
+
+    @Before
+    public void setupTest() throws Exception {
+        String[] testArgs = Lists.newArrayList("server", "src/test/resources/configuration.yml").toArray(new String[2]);
+        TestStreamsApplication.main(testArgs);
+    }
+
+    @Test
+    public void testApplicationStarted() throws Exception {
+
+        final URL url = new URL("http://localhost:8003/admin/ping");
+        final String response = new BufferedReader(new InputStreamReader(url.openStream())).readLine();
+        Assert.assertEquals("pong", response);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6c989cb6/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/TestStreamsApplication.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/TestStreamsApplication.java b/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/TestStreamsApplication.java
new file mode 100644
index 0000000..7fe17cf
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/TestStreamsApplication.java
@@ -0,0 +1,9 @@
+package org.apache.streams.dropwizard.test;
+
+import org.apache.streams.dropwizard.StreamsApplication;
+
+/**
+ * This class exists to support {@link: org.apache.streams.dropwizard.test.StreamsApplicationIT}
+ */
+public class TestStreamsApplication extends StreamsApplication {
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6c989cb6/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml b/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml
new file mode 100644
index 0000000..e2943cd
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml
@@ -0,0 +1,27 @@
+template: Hello, %s!
+defaultName: datasift
+
+server:
+  type: simple
+  applicationContextPath: /
+  adminContextPath: /admin
+  connector:
+    type: http
+    port: 8000
+
+logging:
+  level: DEBUG
+  appenders:
+    - type: console
+      threshold: ALL
+      target: stdout
+
+elasticsearch:
+  hosts:
+    - "localhost"
+  port: 9300
+  clusterName: elasticsearch
+  index: datasift_webhook
+  type: activity
+  batchSize: 100
+


[3/9] incubator-streams git commit: /streams/webhooks/* confirmed working

Posted by sb...@apache.org.
/streams/webhooks/* confirmed working


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/dbda9ed7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/dbda9ed7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/dbda9ed7

Branch: refs/heads/STREAMS-222
Commit: dbda9ed7075d0c65f0cd3c1db768dc35e63b22c0
Parents: e611290
Author: sblackmon <sb...@apache.org>
Authored: Thu Nov 20 15:28:31 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Thu Nov 20 15:28:31 2014 -0600

----------------------------------------------------------------------
 .../apache/streams/datasift/DatasiftPush.json   |  30 ---
 streams-core/pom.xml                            |   5 +
 .../apache/streams/core/StreamsResource.java    |  17 ++
 .../streams-runtime-dropwizard/pom.xml          |   5 -
 .../dropwizard/GenericWebhookResource.java      | 217 ++++++++++++++++++
 .../dropwizard/StreamDropwizardBuilder.java     |  39 ++++
 .../streams/dropwizard/StreamsApplication.java  |  84 +++++--
 .../dropwizard/StreamsDropwizardModule.java     |  31 +--
 .../streams/dropwizard/WebhookResource.java     | 222 -------------------
 .../streams/dropwizard/GenericWebhookData.json  |  30 +++
 10 files changed, 377 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftPush.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftPush.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftPush.json
deleted file mode 100644
index 50e4f00..0000000
--- a/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftPush.json
+++ /dev/null
@@ -1,30 +0,0 @@
-{
-    "type": "object",
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "javaType": "org.apache.streams.datasift.DatasiftPush",
-    "properties": {
-        "id": {
-            "type": "string"
-        },
-        "hash": {
-            "type": "string"
-        },
-        "hash_type": {
-            "type": "string"
-        },
-        "count": {
-            "type": "long"
-        },
-        "delivered_at": {
-            "type": "string",
-            "format": "date-time"
-        },
-        "interactions": {
-            "type": "array",
-            "items": {
-                "type": "object",
-                "javaType": "org.apache.streams.datasift.Datasift"
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-core/pom.xml
----------------------------------------------------------------------
diff --git a/streams-core/pom.xml b/streams-core/pom.xml
index 9546b5f..950687f 100644
--- a/streams-core/pom.xml
+++ b/streams-core/pom.xml
@@ -36,6 +36,11 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
+            <groupId>javax.ws.rs</groupId>
+            <artifactId>jsr311-api</artifactId>
+            <version>1.1.1</version>
+        </dependency>
+        <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-core/src/main/java/org/apache/streams/core/StreamsResource.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsResource.java b/streams-core/src/main/java/org/apache/streams/core/StreamsResource.java
new file mode 100644
index 0000000..4bd18e2
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamsResource.java
@@ -0,0 +1,17 @@
+package org.apache.streams.core;
+
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+
+/**
+ * Created by sblackmon on 11/20/14.
+ */
+public interface StreamsResource {
+
+    public Response json(HttpHeaders headers, String body);
+
+    public Response json_new_line(HttpHeaders headers, String body);
+
+    public Response json_meta(HttpHeaders headers, String body);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-runtimes/streams-runtime-dropwizard/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/pom.xml b/streams-runtimes/streams-runtime-dropwizard/pom.xml
index ab6e927..045fa86 100644
--- a/streams-runtimes/streams-runtime-dropwizard/pom.xml
+++ b/streams-runtimes/streams-runtime-dropwizard/pom.xml
@@ -138,11 +138,6 @@
             <artifactId>streams-runtime-local</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-persist-console</artifactId>
-            <version>${project.version}</version>
-        </dependency>
 
         <dependency>
             <groupId>ch.qos.logback</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
new file mode 100644
index 0000000..56d57b0
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
@@ -0,0 +1,217 @@
+package org.apache.streams.dropwizard;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Queues;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResource;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.ComponentUtils;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Resource;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
+
+@Resource
+@Path("/streams/webhooks")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+public class GenericWebhookResource implements StreamsProvider, StreamsResource {
+
+    public GenericWebhookResource() {
+    }
+
+    private static final Logger log = LoggerFactory
+            .getLogger(GenericWebhookResource.class);
+
+    private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    protected Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>();
+
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    private static Pattern newLinePattern = Pattern.compile("(\\r\\n?|\\n)", Pattern.MULTILINE);
+
+    @POST
+    @Path("json")
+    public Response json(@Context HttpHeaders headers,
+                                  String body) {
+
+        ObjectNode response = mapper.createObjectNode();
+
+        StreamsDatum datum = new StreamsDatum(body);
+
+        lock.writeLock().lock();
+        ComponentUtils.offerUntilSuccess(datum, providerQueue);
+        lock.writeLock().unlock();
+
+        Boolean success = true;
+
+        response.put("success", success);
+
+        return Response.status(200).entity(response).build();
+
+    }
+
+    @POST
+    @Path("json_new_line")
+    public Response json_new_line(@Context HttpHeaders headers,
+                                           String body) {
+
+        ObjectNode response = mapper.createObjectNode();
+
+        if (body.equalsIgnoreCase("{}")) {
+
+            Boolean success = true;
+
+            response.put("success", success);
+
+            return Response.status(200).entity(response).build();
+        }
+
+        try {
+
+            for( String item : Splitter.on(newLinePattern).split(body)) {
+                StreamsDatum datum = new StreamsDatum(item);
+
+                lock.writeLock().lock();
+                ComponentUtils.offerUntilSuccess(datum, providerQueue);
+                lock.writeLock().unlock();
+
+            }
+
+            Boolean success = true;
+
+            response.put("success", success);
+
+            return Response.status(200).entity(response).build();
+
+        } catch (Exception e) {
+            log.warn(e.toString(), e);
+
+            Boolean success = false;
+
+            response.put("success", success);
+
+            return Response.status(500).entity(response).build();
+
+        }
+
+    }
+
+    @POST
+    @Path("json_meta")
+    public Response json_meta(@Context HttpHeaders headers,
+                                       String body) {
+
+        //log.debug(headers.toString(), headers);
+
+        //log.debug(body.toString(), body);
+
+        ObjectNode response = mapper.createObjectNode();
+
+        if (body.equalsIgnoreCase("{}")) {
+
+            Boolean success = true;
+
+            response.put("success", success);
+
+            return Response.status(200).entity(response).build();
+        }
+
+        try {
+
+            GenericWebhookData objectWrapper = mapper.readValue(body, GenericWebhookData.class);
+
+            for( ObjectNode item : objectWrapper.getData()) {
+
+                String json = mapper.writeValueAsString(item);
+
+                StreamsDatum datum = new StreamsDatum(json);
+
+                lock.writeLock().lock();
+                ComponentUtils.offerUntilSuccess(datum, providerQueue);
+                lock.writeLock().unlock();
+            }
+
+            Boolean success = true;
+
+            response.put("success", success);
+
+            return Response.status(200).entity(response).build();
+
+        } catch (Exception e) {
+            log.warn(e.toString(), e);
+        }
+
+        return Response.status(500).build();
+    }
+
+    public List<ObjectNode> getData(GenericWebhookData wrapper) {
+        return wrapper.getData();
+    }
+
+    @Override
+    public void startStream() {
+        return;
+    }
+
+    @Override
+    public StreamsResultSet readCurrent() {
+
+        StreamsResultSet current;
+
+        lock.writeLock().lock();
+        current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+        providerQueue.clear();
+        lock.writeLock().unlock();
+
+        return current;
+
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return true;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
new file mode 100644
index 0000000..4292900
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
@@ -0,0 +1,39 @@
+package org.apache.streams.dropwizard;
+
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.Map;
+
+/**
+ * Created by sblackmon on 11/20/14.
+ */
+public class StreamDropwizardBuilder extends LocalStreamBuilder implements StreamBuilder {
+
+    public StreamDropwizardBuilder() {
+        super();
+    }
+
+    public StreamDropwizardBuilder(Map<String, Object> streamConfig) {
+        super(streamConfig);
+    }
+
+    public StreamDropwizardBuilder(int maxQueueCapacity) {
+        super(maxQueueCapacity);
+    }
+
+    public StreamDropwizardBuilder(int maxQueueCapacity, Map<String, Object> streamConfig) {
+        super(maxQueueCapacity, streamConfig);
+    }
+
+    @Override
+    public StreamBuilder newPerpetualStream(String streamId, StreamsProvider provider) {
+        return super.newPerpetualStream(streamId, provider);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
index 733b078..67d0446 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
@@ -6,7 +6,9 @@ import com.fasterxml.jackson.datatype.guava.GuavaModule;
 import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
 import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.hubspot.dropwizard.guice.GuiceBundle;
+import com.sun.jersey.api.core.ResourceConfig;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigRenderOptions;
@@ -14,22 +16,37 @@ import io.dropwizard.Application;
 import io.dropwizard.jackson.GuavaExtrasModule;
 import io.dropwizard.setup.Bootstrap;
 import io.dropwizard.setup.Environment;
+import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.console.ConsolePersistWriter;
 import org.apache.streams.core.StreamBuilder;
 import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.local.builders.LocalStreamBuilder;
 import org.apache.streams.pojo.json.Activity;
+import org.joda.time.DateTime;
+import org.reflections.Reflections;
+import org.reflections.util.ConfigurationBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import com.google.inject.Inject;
+
+import javax.annotation.Resource;
+import javax.ws.rs.Path;
+
 public class StreamsApplication extends Application<StreamsDropwizardConfiguration> {
 
     private static final Logger LOGGER = LoggerFactory
@@ -37,11 +54,11 @@ public class StreamsApplication extends Application<StreamsDropwizardConfigurati
 
     private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    private StreamBuilder builder;
+    protected StreamBuilder builder;
 
-    private WebhookResource webhook;
+    private static StreamsConfiguration streamsConfiguration;
 
-    private String broadcastURI;
+    private Set<StreamsProvider> resourceProviders = Sets.newConcurrentHashSet();
 
     private Executor executor = Executors.newSingleThreadExecutor();
 
@@ -70,33 +87,63 @@ public class StreamsApplication extends Application<StreamsDropwizardConfigurati
     @Override
     public void run(StreamsDropwizardConfiguration streamsDropwizardConfiguration, Environment environment) throws Exception {
 
-        webhook = new WebhookResource();
-
         executor = Executors.newSingleThreadExecutor();
 
-        executor.execute(new StreamsDropwizardRunner());
+        for( Class<?> resourceProviderClass : environment.jersey().getResourceConfig().getRootResourceClasses() ) {
+            StreamsProvider provider = (StreamsProvider)resourceProviderClass.newInstance();
+            if( StreamsProvider.class.isInstance(provider))
+                resourceProviders.add(provider);
+        }
+
+        streamsConfiguration = mapper.convertValue(streamsDropwizardConfiguration, StreamsConfiguration.class);
+
+        builder = setup(streamsConfiguration, resourceProviders);
+
+        executor.execute(new StreamsDropwizardRunner(builder, streamsConfiguration));
 
         // wait for streams to start up
         Thread.sleep(10000);
 
-        //environment.jersey().register(webhook);
+        for (StreamsProvider resource : resourceProviders) {
+            environment.jersey().register(resource);
+            LOGGER.info("Added resource class: {}", resource);
+        }
+
+    }
+
+    public StreamBuilder setup(StreamsConfiguration streamsConfiguration, Set<StreamsProvider> resourceProviders) {
+
+        Map<String, Object> streamConfig = Maps.newHashMap();
+        streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 20 * 60 * 1000 * 1000);
+        if(! Strings.isNullOrEmpty(streamsConfiguration.getBroadcastURI()) ) streamConfig.put("broadcastURI", streamsConfiguration.getBroadcastURI());
+        StreamBuilder builder = new StreamDropwizardBuilder(1000, streamConfig);
 
+        List<String> providers = new ArrayList<>();
+        for( StreamsProvider provider: resourceProviders) {
+            String providerId = provider.getClass().getSimpleName();
+            builder.newPerpetualStream(providerId, provider);
+            providers.add(providerId);
+        }
+
+        return builder;
     }
 
     private class StreamsDropwizardRunner implements Runnable {
 
+        private StreamsConfiguration streamsConfiguration;
+
+        private StreamBuilder builder;
+
+        protected StreamsDropwizardRunner(StreamBuilder builder, StreamsConfiguration streamsConfiguration) {
+            this.streamsConfiguration = streamsConfiguration;
+            this.builder = builder;
+        }
+
         @Override
         public void run() {
 
-            Map<String, Object> streamConfig = Maps.newHashMap();
-            streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 20 * 60 * 1000 * 1000);
-            if(! Strings.isNullOrEmpty(broadcastURI) ) streamConfig.put("broadcastURI", broadcastURI);
-            builder = new LocalStreamBuilder(1000, streamConfig);
+            builder.start();
 
-            // prepare stream components
-            builder.newPerpetualStream("webhooks", webhook);
-
-            builder.addStreamsPersistWriter("console", new ConsolePersistWriter(), 1, "webhooks");
         }
     }
 
@@ -104,9 +151,8 @@ public class StreamsApplication extends Application<StreamsDropwizardConfigurati
     public static void main(String[] args) throws Exception
     {
 
-        StreamsApplication application = new StreamsApplication();
-        if( args.length == 1 ) application.broadcastURI = args[0];
-        application.run(args);
+        new StreamsApplication().run(args);
 
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
index 4264dbb..f5cd020 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
@@ -3,6 +3,7 @@ package org.apache.streams.dropwizard;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
+import com.google.inject.Singleton;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigRenderOptions;
@@ -18,37 +19,13 @@ public class StreamsDropwizardModule extends AbstractModule {
 
     @Override
     protected void configure() {
-        // anything you'd like to configure
+        requestStaticInjection(StreamsConfiguration.class);
     }
 
     @Provides
-    public StreamsConfiguration providesStreamsConfiguration(StreamsDropwizardConfiguration configuration) {
+    @Singleton
+    public StreamsConfiguration providesStreamsConfiguration() {
         return StreamsConfigurator.detectConfiguration();
     }
 
-//    private StreamsDropwizardConfiguration reconfigure(StreamsDropwizardConfiguration streamsConfiguration) {
-//
-//        // config from dropwizard
-//        Config configDropwizard = null;
-//        try {
-//            configDropwizard = ConfigFactory.parseString(mapper.writeValueAsString(streamsConfiguration));
-//        } catch (JsonProcessingException e) {
-//            e.printStackTrace();
-//            LOGGER.error("Invalid Configuration: " + streamsConfiguration);
-//        }
-//
-//        Config combinedConfig = configTypesafe.withFallback(configDropwizard);
-//        String combinedConfigJson = combinedConfig.root().render(ConfigRenderOptions.concise());
-//
-//        StreamsDropwizardConfiguration combinedDropwizardConfig = null;
-//        try {
-//            combinedDropwizardConfig = mapper.readValue(combinedConfigJson, StreamsDropwizardConfiguration.class);
-//        } catch (IOException e) {
-//            e.printStackTrace();
-//            LOGGER.error("Invalid Configuration after merge: " + streamsConfiguration);
-//        }
-//
-//        return  combinedDropwizardConfig;
-//
-//    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/WebhookResource.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/WebhookResource.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/WebhookResource.java
deleted file mode 100644
index 1f80c5c..0000000
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/WebhookResource.java
+++ /dev/null
@@ -1,222 +0,0 @@
-package org.apache.streams.dropwizard;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Queues;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.ComponentUtils;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Resource;
-import javax.inject.Inject;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.math.BigInteger;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
-
-@Resource
-@Path("/streams/webhooks")
-@Produces(MediaType.APPLICATION_JSON)
-@Consumes(MediaType.APPLICATION_JSON)
-public class WebhookResource implements StreamsProvider {
-
-    public WebhookResource() {
-    }
-
-    private static final Logger log = LoggerFactory
-            .getLogger(WebhookResource.class);
-
-    private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-    protected Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>();
-
-    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
-    private static Pattern newLinePattern = Pattern.compile("(\\r\\n?|\\n)", Pattern.MULTILINE);
-
-    @POST
-    @Path("json")
-    public Response json(@Context HttpHeaders headers,
-                                  String body) {
-
-        ObjectNode response = mapper.createObjectNode();
-
-        StreamsDatum datum = new StreamsDatum(body);
-
-        lock.writeLock().lock();
-        ComponentUtils.offerUntilSuccess(datum, providerQueue);
-        lock.writeLock().unlock();
-
-        Boolean success = true;
-
-        response.put("success", success);
-
-        return Response.status(200).entity(response).build();
-
-    }
-
-    @POST
-    @Path("json_new_line")
-    public Response json_new_line(@Context HttpHeaders headers,
-                                           String body) {
-
-        ObjectNode response = mapper.createObjectNode();
-
-        if (body.equalsIgnoreCase("{}")) {
-
-            Boolean success = true;
-
-            response.put("success", success);
-
-            return Response.status(200).entity(response).build();
-        }
-
-        try {
-
-            for( String item : Splitter.on('\n').split(body)) {
-                StreamsDatum datum = new StreamsDatum(item);
-
-                lock.writeLock().lock();
-                ComponentUtils.offerUntilSuccess(datum, providerQueue);
-                lock.writeLock().unlock();
-
-            }
-
-            Boolean success = true;
-
-            response.put("success", success);
-
-            return Response.status(200).entity(response).build();
-
-        } catch (Exception e) {
-            log.warn(e.toString(), e);
-
-            Boolean success = false;
-
-            response.put("success", success);
-
-            return Response.status(500).entity(response).build();
-
-        }
-
-    }
-
-//    @POST
-//    @Path("json_meta")
-//    public Response json_meta(@Context HttpHeaders headers,
-//                                       String body) {
-//
-//        //log.debug(headers.toString(), headers);
-//
-//        //log.debug(body.toString(), body);
-//
-//        ObjectNode response = mapper.createObjectNode();
-//
-//        if (body.equalsIgnoreCase("{}")) {
-//
-//            Boolean success = true;
-//
-//            response.put("success", success);
-//
-//            return Response.status(200).entity(response).build();
-//        }
-//
-//        try {
-//
-//            ObjectNode objectWrapper = mapper.readValue(body, ObjectNode.class);
-//
-//            for( ObjectNode item : objectWrapper.getData()) {
-//
-//                String json = mapper.writeValueAsString(item);
-//
-//                StreamsDatum datum = new StreamsDatum(json, item.getInteraction().getId(), item.getInteraction().getCreatedAt());
-//
-//                lock.writeLock().lock();
-//                ComponentUtils.offerUntilSuccess(datum, providerQueue);
-//                lock.writeLock().unlock();
-//            }
-//
-//            log.info("interactionQueue: " + providerQueue.size());
-//
-//            Boolean success = true;
-//
-//            response.put("success", success);
-//
-//            return Response.status(200).entity(response).build();
-//
-//        } catch (Exception e) {
-//            log.warn(e.toString(), e);
-//        }
-//
-//        return Response.status(500).build();
-//    }
-
-    @Override
-    public void startStream() {
-        return;
-    }
-
-    @Override
-    public StreamsResultSet readCurrent() {
-
-        StreamsResultSet current;
-
-        lock.writeLock().lock();
-        current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
-        providerQueue.clear();
-        lock.writeLock().unlock();
-
-        return current;
-
-    }
-
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return null;
-    }
-
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return null;
-    }
-
-    @Override
-    public boolean isRunning() {
-        return false;
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-
-    }
-
-    @Override
-    public void cleanUp() {
-
-    }
-
-    public void addDatum(StreamsDatum datum) {
-        try {
-            lock.readLock().lock();
-            ComponentUtils.offerUntilSuccess(datum, providerQueue);
-        } finally {
-            lock.readLock().unlock();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-runtimes/streams-runtime-dropwizard/src/main/jsonschema/org/apache/streams/dropwizard/GenericWebhookData.json
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/jsonschema/org/apache/streams/dropwizard/GenericWebhookData.json b/streams-runtimes/streams-runtime-dropwizard/src/main/jsonschema/org/apache/streams/dropwizard/GenericWebhookData.json
new file mode 100644
index 0000000..53a26c7
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/jsonschema/org/apache/streams/dropwizard/GenericWebhookData.json
@@ -0,0 +1,30 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "javaType": "org.apache.streams.dropwizard.GenericWebhookData",
+    "properties": {
+        "id": {
+            "type": "string"
+        },
+        "hash": {
+            "type": "string"
+        },
+        "hash_type": {
+            "type": "string"
+        },
+        "count": {
+            "type": "long"
+        },
+        "delivered_at": {
+            "type": "string",
+            "format": "date-time"
+        },
+        "data": {
+            "type": "array",
+            "items": {
+                "type": "object",
+                "javaType": "com.fasterxml.jackson.databind.node.ObjectNode"
+            }
+        }
+    }
+}
\ No newline at end of file


[5/9] incubator-streams git commit: useful to have this metadata downstream

Posted by sb...@apache.org.
useful to have this metadata downstream


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/19874278
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/19874278
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/19874278

Branch: refs/heads/STREAMS-222
Commit: 1987427800b0cf90c197342ce522bab4cd72d605
Parents: dbb6952
Author: sblackmon <sb...@apache.org>
Authored: Thu Nov 20 17:18:42 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Thu Nov 20 17:18:42 2014 -0600

----------------------------------------------------------------------
 .../datasift/provider/DatasiftPushProvider.java | 23 ++++++++++++++++++++
 1 file changed, 23 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/19874278/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
index 2ab949f..4d5bff3 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
@@ -48,6 +50,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.math.BigInteger;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -171,6 +174,26 @@ public class DatasiftPushProvider implements StreamsProvider {
                 String json = mapper.writeValueAsString(item);
 
                 StreamsDatum datum = new StreamsDatum(json);
+                if( item.getInteraction() != null &&
+                    !Strings.isNullOrEmpty(item.getInteraction().getId())) {
+                    datum.setId(item.getInteraction().getId());
+                }
+                if( item.getInteraction() != null &&
+                    item.getInteraction().getCreatedAt() != null) {
+                    datum.setTimestamp(item.getInteraction().getCreatedAt());
+                }
+                Map<String, Object> metadata = Maps.newHashMap();
+                metadata.put("datasift.hash", objectWrapper.getHash());
+                metadata.put("datasift.hashType", objectWrapper.getHashType());
+                metadata.put("datasift.id",objectWrapper.getId());
+
+                if( item.getInteraction() != null &&
+                        item.getInteraction().getTags() != null &&
+                        item.getInteraction().getTags().size() > 0) {
+                    metadata.put("datasift.interaction.tags", item.getInteraction().getTags());
+                }
+
+                datum.setMetadata(metadata);
 
                 lock.writeLock().lock();
                 ComponentUtils.offerUntilSuccess(datum, providerQueue);


[2/9] incubator-streams git commit: Merge remote-tracking branch 'apache/master' into dropwizard

Posted by sb...@apache.org.
Merge remote-tracking branch 'apache/master' into dropwizard


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e6112906
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e6112906
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e6112906

Branch: refs/heads/STREAMS-222
Commit: e6112906772aa5909e09f7246005e0caef5afff5
Parents: a57c017 1b55741
Author: sblackmon <sb...@apache.org>
Authored: Wed Nov 19 16:26:18 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Wed Nov 19 16:26:18 2014 -0600

----------------------------------------------------------------------
 .../serializer/FacebookActivityUtil.java        |  2 +-
 .../gplus/provider/AbstractGPlusProvider.java   | 31 +++++++-----
 .../gplus/provider/GPlusConfigurator.java       | 32 ++++++------
 .../provider/GPlusUserActivityProvider.java     | 11 +++++
 .../gplus/provider/GPlusUserDataProvider.java   | 11 +++++
 .../com/google/gplus/GPlusConfiguration.json    | 17 +++----
 .../provider/TestAbstractGPlusProvider.java     |  7 ++-
 .../tasks/BroadcastMonitorThread.java           |  5 +-
 .../org/apache/streams/pojo/json/Broadcast.json |  8 +++
 .../local/builders/LocalStreamBuilder.java      | 37 +++++++++++---
 .../streams/local/builders/StreamComponent.java | 31 +++++++-----
 .../local/counters/DatumStatusCounter.java      |  9 +++-
 .../local/counters/StreamsTaskCounter.java      | 16 ++++--
 .../streams/local/queues/ThroughputQueue.java   | 52 +++++++++++++++++---
 .../streams/local/tasks/BaseStreamsTask.java    | 41 ++++++++++++++-
 .../streams/local/tasks/StreamsMergeTask.java   |  7 ++-
 .../local/tasks/StreamsPersistWriterTask.java   | 11 +++--
 .../local/tasks/StreamsProcessorTask.java       | 14 ++++--
 .../local/tasks/StreamsProviderTask.java        | 11 +++--
 .../local/builders/LocalStreamBuilderTest.java  |  9 ++--
 .../local/counters/DatumStatusCounterTest.java  | 22 ++++-----
 .../local/counters/StreamsTaskCounterTest.java  | 21 ++++----
 .../queues/ThroughputQueueMulitThreadTest.java  |  5 +-
 .../queues/ThroughputQueueSingleThreadTest.java | 15 +++---
 .../streams/local/tasks/BasicTasksTest.java     |  6 +--
 .../local/tasks/StreamsProviderTaskTest.java    | 10 ++--
 .../streams/storm/trident/StreamsTopology.java  |  2 +-
 27 files changed, 315 insertions(+), 128 deletions(-)
----------------------------------------------------------------------



[8/9] incubator-streams git commit: more tests javadoc headers refactor DatasiftPushProvider for compatibility

Posted by sb...@apache.org.
more tests
javadoc headers
refactor DatasiftPushProvider for compatibility


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/3d5f291a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/3d5f291a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/3d5f291a

Branch: refs/heads/STREAMS-222
Commit: 3d5f291a5f66f78d8728db0baf5ab6103ada42bf
Parents: 6c989cb
Author: sblackmon <sb...@apache.org>
Authored: Fri Nov 21 13:26:43 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Fri Nov 21 13:26:43 2014 -0600

----------------------------------------------------------------------
 pom.xml                                         |  7 +-
 .../datasift/provider/DatasiftPushProvider.java |  3 +-
 .../streams-runtime-dropwizard/pom.xml          | 28 +++++--
 .../dropwizard/GenericWebhookResource.java      | 82 +++++++++++++-------
 .../dropwizard/StreamDropwizardBuilder.java     |  4 +-
 .../streams/dropwizard/StreamsApplication.java  | 19 ++++-
 .../dropwizard/StreamsDropwizardModule.java     |  5 +-
 .../test/GenericWebhookResourceTest.java        |  2 +-
 .../src/test/resources/configuration.yml        | 13 +---
 9 files changed, 108 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 351ec68..317861f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,6 +77,7 @@
         <surefire.plugin.version>2.17</surefire.plugin.version>
         <failsafe.plugin.version>2.17</failsafe.plugin.version>
         <slf4j.version>1.7.6</slf4j.version>
+        <hamcrest.version>1.3</hamcrest.version>
         <logback.version>1.1.1</logback.version>
         <commons-io.version>2.4</commons-io.version>
         <commons-lang3.version>3.1</commons-lang3.version>
@@ -260,7 +261,11 @@
                 <artifactId>config</artifactId>
                 <version>${typesafe.config.version}</version>
             </dependency>
-
+            <dependency>
+                <groupId>org.hamcrest</groupId>
+                <artifactId>hamcrest-all</artifactId>
+                <version>${hamcrest.version}</version>
+            </dependency>
             <dependency>
                 <groupId>junit</groupId>
                 <artifactId>junit</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
index a363cb1..cce5930 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
@@ -29,7 +29,6 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResource;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.datasift.Datasift;
 import org.apache.streams.datasift.DatasiftConfiguration;
@@ -67,7 +66,7 @@ import java.util.regex.Pattern;
 @Path("/streams/webhooks/datasift")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
-public class DatasiftPushProvider implements StreamsProvider, StreamsResource {
+public class DatasiftPushProvider implements StreamsProvider {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftPushProvider.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/pom.xml b/streams-runtimes/streams-runtime-dropwizard/pom.xml
index 045fa86..c18ba9d 100644
--- a/streams-runtimes/streams-runtime-dropwizard/pom.xml
+++ b/streams-runtimes/streams-runtime-dropwizard/pom.xml
@@ -69,6 +69,19 @@
         </dependency>
 
         <dependency>
+            <groupId>io.dropwizard</groupId>
+            <artifactId>dropwizard-metrics</artifactId>
+            <version>${dropwizard.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.dropwizard</groupId>
+            <artifactId>dropwizard-testing</artifactId>
+            <version>${dropwizard.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-annotations</artifactId>
             <version>${jackson.version}</version>
@@ -152,12 +165,6 @@
             <artifactId>log4j-over-slf4j</artifactId>
         </dependency>
 
-        <!-- This ensures slf4j-log4j12 is not packaged in implementations -->
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <scope>provided</scope>
-        </dependency>
         <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-all</artifactId>
@@ -238,6 +245,15 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+            </plugin>
+            <!--<plugin>-->
+                <!--<groupId>org.apache.maven.plugins</groupId>-->
+                <!--<artifactId>maven-resources-plugin</artifactId>-->
+                <!--<resources>test.</resources>-->
+            <!--</plugin>-->
         </plugins>
     </build>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
index 56d57b0..8416361 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
@@ -6,7 +6,6 @@ import com.google.common.base.Splitter;
 import com.google.common.collect.Queues;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResource;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.util.ComponentUtils;
@@ -31,11 +30,17 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Pattern;
 
+/**
+ * GenericWebhookResource provides basic webhook connectivity.
+ *
+ * Add processors / persistWriters that read from "GenericWebhookResource" to
+ * consume data posted to streams.
+ */
 @Resource
 @Path("/streams/webhooks")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
-public class GenericWebhookResource implements StreamsProvider, StreamsResource {
+public class GenericWebhookResource implements StreamsProvider {
 
     public GenericWebhookResource() {
     }
@@ -57,19 +62,35 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
                                   String body) {
 
         ObjectNode response = mapper.createObjectNode();
+        int responseCode = Response.Status.BAD_REQUEST.getStatusCode();
 
-        StreamsDatum datum = new StreamsDatum(body);
+        try {
+            ObjectNode item = mapper.readValue(body, ObjectNode.class);
 
-        lock.writeLock().lock();
-        ComponentUtils.offerUntilSuccess(datum, providerQueue);
-        lock.writeLock().unlock();
+            StreamsDatum datum = new StreamsDatum(body);
+
+            lock.writeLock().lock();
+            ComponentUtils.offerUntilSuccess(datum, providerQueue);
+            lock.writeLock().unlock();
 
-        Boolean success = true;
+            Boolean success = true;
 
-        response.put("success", success);
+            response.put("success", success);
 
-        return Response.status(200).entity(response).build();
+            responseCode = Response.Status.OK.getStatusCode();
 
+        } catch (Exception e) {
+            log.warn(e.toString(), e);
+
+            Boolean success = false;
+
+            response.put("success", success);
+            responseCode = Response.Status.BAD_REQUEST.getStatusCode();
+
+        } finally {
+            return Response.status(responseCode).entity(response).build();
+
+        }
     }
 
     @POST
@@ -78,19 +99,22 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
                                            String body) {
 
         ObjectNode response = mapper.createObjectNode();
+        int responseCode = Response.Status.BAD_REQUEST.getStatusCode();
 
         if (body.equalsIgnoreCase("{}")) {
 
             Boolean success = true;
 
             response.put("success", success);
-
-            return Response.status(200).entity(response).build();
+            responseCode = Response.Status.OK.getStatusCode();
+            return Response.status(responseCode).entity(response).build();
         }
 
         try {
 
-            for( String item : Splitter.on(newLinePattern).split(body)) {
+            for( String line : Splitter.on(newLinePattern).split(body)) {
+                ObjectNode item = mapper.readValue(line, ObjectNode.class);
+
                 StreamsDatum datum = new StreamsDatum(item);
 
                 lock.writeLock().lock();
@@ -102,8 +126,7 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
             Boolean success = true;
 
             response.put("success", success);
-
-            return Response.status(200).entity(response).build();
+            responseCode = Response.Status.OK.getStatusCode();
 
         } catch (Exception e) {
             log.warn(e.toString(), e);
@@ -111,8 +134,10 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
             Boolean success = false;
 
             response.put("success", success);
+            responseCode = Response.Status.BAD_REQUEST.getStatusCode();
 
-            return Response.status(500).entity(response).build();
+        } finally {
+            return Response.status(responseCode).entity(response).build();
 
         }
 
@@ -123,19 +148,17 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
     public Response json_meta(@Context HttpHeaders headers,
                                        String body) {
 
-        //log.debug(headers.toString(), headers);
-
-        //log.debug(body.toString(), body);
-
         ObjectNode response = mapper.createObjectNode();
+        int responseCode = Response.Status.BAD_REQUEST.getStatusCode();
 
         if (body.equalsIgnoreCase("{}")) {
 
             Boolean success = true;
 
             response.put("success", success);
+            responseCode = Response.Status.OK.getStatusCode();
 
-            return Response.status(200).entity(response).build();
+            return Response.status(responseCode).entity(response).build();
         }
 
         try {
@@ -144,9 +167,7 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
 
             for( ObjectNode item : objectWrapper.getData()) {
 
-                String json = mapper.writeValueAsString(item);
-
-                StreamsDatum datum = new StreamsDatum(json);
+                StreamsDatum datum = new StreamsDatum(item);
 
                 lock.writeLock().lock();
                 ComponentUtils.offerUntilSuccess(datum, providerQueue);
@@ -156,18 +177,19 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
             Boolean success = true;
 
             response.put("success", success);
-
-            return Response.status(200).entity(response).build();
+            responseCode = Response.Status.OK.getStatusCode();
 
         } catch (Exception e) {
             log.warn(e.toString(), e);
-        }
 
-        return Response.status(500).build();
-    }
+            Boolean success = false;
+
+            response.put("success", success);
+            responseCode = Response.Status.BAD_REQUEST.getStatusCode();
+        } finally {
+            return Response.status(responseCode).entity(response).build();
+        }
 
-    public List<ObjectNode> getData(GenericWebhookData wrapper) {
-        return wrapper.getData();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
index 4292900..524009a 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
@@ -11,7 +11,9 @@ import java.math.BigInteger;
 import java.util.Map;
 
 /**
- * Created by sblackmon on 11/20/14.
+ * StreamDropwizardBuilder is currently a light wrapper around LocalStreamBuilder
+ *
+ * It's a seperate class because they will almost certainly deviate going forward
  */
 public class StreamDropwizardBuilder extends LocalStreamBuilder implements StreamBuilder {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
index 67d0446..fce9852 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
@@ -1,5 +1,7 @@
 package org.apache.streams.dropwizard;
 
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.MetricRegistry;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.datatype.guava.GuavaModule;
@@ -14,6 +16,7 @@ import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigRenderOptions;
 import io.dropwizard.Application;
 import io.dropwizard.jackson.GuavaExtrasModule;
+import io.dropwizard.metrics.MetricsFactory;
 import io.dropwizard.setup.Bootstrap;
 import io.dropwizard.setup.Environment;
 import org.apache.streams.config.StreamsConfiguration;
@@ -47,12 +50,19 @@ import com.google.inject.Inject;
 import javax.annotation.Resource;
 import javax.ws.rs.Path;
 
+/**
+ * Entry point to a dropwizard streams application
+ *
+ * It will start up a stream in the local runtime, as well as bind any
+ * StreamsProvider on the classpath with a @Resource annotation.
+ *
+ */
 public class StreamsApplication extends Application<StreamsDropwizardConfiguration> {
 
     private static final Logger LOGGER = LoggerFactory
 			.getLogger(StreamsApplication.class);
 
-    private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+    protected static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
     protected StreamBuilder builder;
 
@@ -95,6 +105,10 @@ public class StreamsApplication extends Application<StreamsDropwizardConfigurati
                 resourceProviders.add(provider);
         }
 
+        MetricRegistry metrics = new MetricRegistry();
+        MetricsFactory mfac = streamsDropwizardConfiguration.getMetricsFactory();
+        mfac.configure(environment.lifecycle(), metrics);
+
         streamsConfiguration = mapper.convertValue(streamsDropwizardConfiguration, StreamsConfiguration.class);
 
         builder = setup(streamsConfiguration, resourceProviders);
@@ -115,7 +129,8 @@ public class StreamsApplication extends Application<StreamsDropwizardConfigurati
 
         Map<String, Object> streamConfig = Maps.newHashMap();
         streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 20 * 60 * 1000 * 1000);
-        if(! Strings.isNullOrEmpty(streamsConfiguration.getBroadcastURI()) ) streamConfig.put("broadcastURI", streamsConfiguration.getBroadcastURI());
+        //if(! Strings.isNullOrEmpty(streamsConfiguration.getBroadcastURI()) ) streamConfig.put("broadcastURI", streamsConfiguration.getBroadcastURI());
+        streamConfig.put("monitoring_broadcast_interval_ms", -1);
         StreamBuilder builder = new StreamDropwizardBuilder(1000, streamConfig);
 
         List<String> providers = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
index f5cd020..1ba07c0 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
@@ -13,7 +13,10 @@ import org.apache.streams.config.StreamsConfigurator;
 import java.io.IOException;
 
 /**
- * Created by sblackmon on 11/18/14.
+ * This class exists because dropwizard-guice requires at least
+ * one module to run
+ *
+ * Do not expect @Inject StreamsConfiguration to work at the moment.
  */
 public class StreamsDropwizardModule extends AbstractModule {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java b/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java
index d1d02ac..db9133b 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java
@@ -29,7 +29,7 @@ import java.util.List;
 import static org.mockito.Mockito.*;
 
 /**
- * Created by sblackmon on 11/21/14.
+ * Tests {@link: org.apache.streams.dropwizard.GenericWebhookResource}
  */
 public class GenericWebhookResourceTest {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml b/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml
index e2943cd..778f50a 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml
+++ b/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml
@@ -1,5 +1,5 @@
 template: Hello, %s!
-defaultName: datasift
+defaultName: streams
 
 server:
   type: simple
@@ -7,7 +7,7 @@ server:
   adminContextPath: /admin
   connector:
     type: http
-    port: 8000
+    port: 8003
 
 logging:
   level: DEBUG
@@ -16,12 +16,3 @@ logging:
       threshold: ALL
       target: stdout
 
-elasticsearch:
-  hosts:
-    - "localhost"
-  port: 9300
-  clusterName: elasticsearch
-  index: datasift_webhook
-  type: activity
-  batchSize: 100
-


[6/9] incubator-streams git commit: useful to have this metadata downstream

Posted by sb...@apache.org.
useful to have this metadata downstream


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/069969d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/069969d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/069969d7

Branch: refs/heads/STREAMS-222
Commit: 069969d7f9496fbadc18a847648eead27396518e
Parents: 1987427
Author: sblackmon <sb...@apache.org>
Authored: Thu Nov 20 17:19:39 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Thu Nov 20 17:19:39 2014 -0600

----------------------------------------------------------------------
 .../streams/datasift/provider/DatasiftPushProvider.java  | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/069969d7/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
index 4d5bff3..a363cb1 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResource;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.datasift.Datasift;
 import org.apache.streams.datasift.DatasiftConfiguration;
@@ -66,7 +67,7 @@ import java.util.regex.Pattern;
 @Path("/streams/webhooks/datasift")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
-public class DatasiftPushProvider implements StreamsProvider {
+public class DatasiftPushProvider implements StreamsProvider, StreamsResource {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftPushProvider.class);
 
@@ -183,14 +184,14 @@ public class DatasiftPushProvider implements StreamsProvider {
                     datum.setTimestamp(item.getInteraction().getCreatedAt());
                 }
                 Map<String, Object> metadata = Maps.newHashMap();
-                metadata.put("datasift.hash", objectWrapper.getHash());
-                metadata.put("datasift.hashType", objectWrapper.getHashType());
-                metadata.put("datasift.id",objectWrapper.getId());
+                metadata.put("hash", objectWrapper.getHash());
+                metadata.put("hashType", objectWrapper.getHashType());
+                metadata.put("id",objectWrapper.getId());
 
                 if( item.getInteraction() != null &&
                         item.getInteraction().getTags() != null &&
                         item.getInteraction().getTags().size() > 0) {
-                    metadata.put("datasift.interaction.tags", item.getInteraction().getTags());
+                    metadata.put("tags", item.getInteraction().getTags());
                 }
 
                 datum.setMetadata(metadata);


[4/9] incubator-streams git commit: refactored DatasiftProvider to Resource

Posted by sb...@apache.org.
refactored DatasiftProvider to Resource


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/dbb69526
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/dbb69526
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/dbb69526

Branch: refs/heads/STREAMS-222
Commit: dbb69526e7469f6b988e9ed35793616d988102f6
Parents: dbda9ed
Author: sblackmon <sb...@apache.org>
Authored: Thu Nov 20 15:30:15 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Thu Nov 20 15:30:15 2014 -0600

----------------------------------------------------------------------
 .../datasift/provider/DatasiftPushProvider.java | 196 ++++++++++++++-----
 .../streams/datasift/DatasiftWebhookData.json   |  30 +++
 2 files changed, 180 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbb69526/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
index 264dbbe..2ab949f 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
@@ -20,66 +20,193 @@ package org.apache.streams.datasift.provider;
 
 import com.datasift.client.stream.DeletedInteraction;
 import com.datasift.client.stream.StreamEventListener;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
 import com.google.common.collect.Queues;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.datasift.Datasift;
 import org.apache.streams.datasift.DatasiftConfiguration;
+import org.apache.streams.datasift.DatasiftWebhookData;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.ComponentUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Resource;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import java.math.BigInteger;
+import java.util.List;
 import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
 
 /**
  * Requires Java Version 1.7!
  * {@code DatasiftStreamProvider} is an implementation of the {@link org.apache.streams.core.StreamsProvider} interface.  The provider
  * uses the Datasift java api to make connections. A single provider creates one connection per StreamHash in the configuration.
  */
+@Resource
+@Path("/streams/webhooks/datasift")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
 public class DatasiftPushProvider implements StreamsProvider {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftPushProvider.class);
 
-    private DatasiftConfiguration config;
-    protected Queue<StreamsDatum> providerQueue;
+    private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    protected Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>();
 
     protected final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-    public DatasiftPushProvider() {
+    private static Pattern newLinePattern = Pattern.compile("(\\r\\n?|\\n)", Pattern.MULTILINE);
+
+    @POST
+    @Path("json")
+    public Response json(@Context HttpHeaders headers,
+                         String body) {
+
+        ObjectNode response = mapper.createObjectNode();
+
+        StreamsDatum datum = new StreamsDatum(body);
+
+        lock.writeLock().lock();
+        ComponentUtils.offerUntilSuccess(datum, providerQueue);
+        lock.writeLock().unlock();
+
+        Boolean success = true;
+
+        response.put("success", success);
+
+        return Response.status(200).entity(response).build();
 
     }
 
-    @Override
-    public void startStream() {
-        Preconditions.checkNotNull(providerQueue);
+    @POST
+    @Path("json_new_line")
+    public Response json_new_line(@Context HttpHeaders headers,
+                                  String body) {
+
+        ObjectNode response = mapper.createObjectNode();
+
+        if (body.equalsIgnoreCase("{}")) {
+
+            Boolean success = true;
+
+            response.put("success", success);
+
+            return Response.status(200).entity(response).build();
+        }
+
+        try {
+
+            for( String item : Splitter.on(newLinePattern).split(body)) {
+                StreamsDatum datum = new StreamsDatum(item);
+
+                lock.writeLock().lock();
+                ComponentUtils.offerUntilSuccess(datum, providerQueue);
+                lock.writeLock().unlock();
+
+            }
+
+            Boolean success = true;
+
+            response.put("success", success);
+
+            return Response.status(200).entity(response).build();
+
+        } catch (Exception e) {
+            LOGGER.warn(e.toString(), e);
+
+            Boolean success = false;
+
+            response.put("success", success);
+
+            return Response.status(500).entity(response).build();
+
+        }
+
     }
 
-    /**
-     * Shuts down all open streams from datasift.
-     */
-    public void stop() {
+    @POST
+    @Path("json_meta")
+    public Response json_meta(@Context HttpHeaders headers,
+                              String body) {
+
+        //log.debug(headers.toString(), headers);
+
+        //log.debug(body.toString(), body);
+
+        ObjectNode response = mapper.createObjectNode();
+
+        if (body.equalsIgnoreCase("{}")) {
+
+            Boolean success = true;
+
+            response.put("success", success);
+
+            return Response.status(200).entity(response).build();
+        }
+
+        try {
+
+            DatasiftWebhookData objectWrapper = mapper.readValue(body, DatasiftWebhookData.class);
+
+            for( Datasift item : objectWrapper.getInteractions()) {
+
+                String json = mapper.writeValueAsString(item);
+
+                StreamsDatum datum = new StreamsDatum(json);
+
+                lock.writeLock().lock();
+                ComponentUtils.offerUntilSuccess(datum, providerQueue);
+                lock.writeLock().unlock();
+            }
+
+            Boolean success = true;
+
+            response.put("success", success);
+
+            return Response.status(200).entity(response).build();
+
+        } catch (Exception e) {
+            LOGGER.warn(e.toString(), e);
+        }
+
+        return Response.status(500).build();
+    }
+
+    @Override
+    public void startStream() {
+        return;
     }
 
-    // PRIME EXAMPLE OF WHY WE NEED NEW INTERFACES FOR PROVIDERS
     @Override
-    //This is a hack.  It is only like this because of how perpetual streams work at the moment.  Read list server to debate/vote for new interfaces.
     public StreamsResultSet readCurrent() {
-        Queue<StreamsDatum> datums = Queues.newConcurrentLinkedQueue();
 
-        StreamsResultSet current = new StreamsResultSet(datums);
-        try {
-            lock.writeLock().lock();
-            current = new StreamsResultSet(providerQueue);
-            providerQueue = constructQueue();
-        } finally {
-            lock.writeLock().unlock();
-        }
+        StreamsResultSet current;
+
+        lock.writeLock().lock();
+        current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+        providerQueue.clear();
+        lock.writeLock().unlock();
 
         return current;
+
     }
 
     @Override
@@ -87,6 +214,7 @@ public class DatasiftPushProvider implements StreamsProvider {
         return null;
     }
 
+    @Override
     public StreamsResultSet readRange(DateTime start, DateTime end) {
         return null;
     }
@@ -98,36 +226,12 @@ public class DatasiftPushProvider implements StreamsProvider {
 
     @Override
     public void prepare(Object configurationObject) {
-        this.providerQueue = constructQueue();
+
     }
 
     @Override
     public void cleanUp() {
-        stop();
-    }
 
-    public DatasiftConfiguration getConfig() {
-        return config;
-    }
-
-    public void setConfig(DatasiftConfiguration config) {
-        this.config = config;
-    }
-
-    private Queue<StreamsDatum> constructQueue() {
-        return Queues.newConcurrentLinkedQueue();
-    }
-
-    /**
-     * THIS CLASS NEEDS TO BE REPLACED/OVERRIDDEN BY ALL USERS. TWITTERS TERMS OF SERVICE SAYS THAT EVERYONE MUST
-     * DELETE TWEETS FROM THEIR DATA STORE IF THEY RECEIVE A DELETE NOTICE.
-     */
-    public static class DeleteHandler extends StreamEventListener {
-
-        public void onDelete(DeletedInteraction di) {
-            //go off and delete the interaction if you have it stored. This is a strict requirement!
-            LOGGER.info("DELETED:\n " + di);
-        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbb69526/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftWebhookData.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftWebhookData.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftWebhookData.json
new file mode 100644
index 0000000..c4e43e8
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftWebhookData.json
@@ -0,0 +1,30 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "javaType": "org.apache.streams.datasift.DatasiftWebhookData",
+    "properties": {
+        "id": {
+            "type": "string"
+        },
+        "hash": {
+            "type": "string"
+        },
+        "hash_type": {
+            "type": "string"
+        },
+        "count": {
+            "type": "long"
+        },
+        "delivered_at": {
+            "type": "string",
+            "format": "date-time"
+        },
+        "interactions": {
+            "type": "array",
+            "items": {
+                "type": "object",
+                "javaType": "org.apache.streams.datasift.Datasift"
+            }
+        }
+    }
+}
\ No newline at end of file