You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/21 20:35:10 UTC
[3/9] incubator-streams git commit: /streams/webhooks/* confirmed
working
/streams/webhooks/* confirmed working
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/dbda9ed7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/dbda9ed7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/dbda9ed7
Branch: refs/heads/STREAMS-222
Commit: dbda9ed7075d0c65f0cd3c1db768dc35e63b22c0
Parents: e611290
Author: sblackmon <sb...@apache.org>
Authored: Thu Nov 20 15:28:31 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Thu Nov 20 15:28:31 2014 -0600
----------------------------------------------------------------------
.../apache/streams/datasift/DatasiftPush.json | 30 ---
streams-core/pom.xml | 5 +
.../apache/streams/core/StreamsResource.java | 17 ++
.../streams-runtime-dropwizard/pom.xml | 5 -
.../dropwizard/GenericWebhookResource.java | 217 ++++++++++++++++++
.../dropwizard/StreamDropwizardBuilder.java | 39 ++++
.../streams/dropwizard/StreamsApplication.java | 84 +++++--
.../dropwizard/StreamsDropwizardModule.java | 31 +--
.../streams/dropwizard/WebhookResource.java | 222 -------------------
.../streams/dropwizard/GenericWebhookData.json | 30 +++
10 files changed, 377 insertions(+), 303 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftPush.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftPush.json b/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftPush.json
deleted file mode 100644
index 50e4f00..0000000
--- a/streams-contrib/streams-provider-datasift/src/main/jsonschema/org/apache/streams/datasift/DatasiftPush.json
+++ /dev/null
@@ -1,30 +0,0 @@
-{
- "type": "object",
- "$schema": "http://json-schema.org/draft-03/schema",
- "javaType": "org.apache.streams.datasift.DatasiftPush",
- "properties": {
- "id": {
- "type": "string"
- },
- "hash": {
- "type": "string"
- },
- "hash_type": {
- "type": "string"
- },
- "count": {
- "type": "long"
- },
- "delivered_at": {
- "type": "string",
- "format": "date-time"
- },
- "interactions": {
- "type": "array",
- "items": {
- "type": "object",
- "javaType": "org.apache.streams.datasift.Datasift"
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-core/pom.xml
----------------------------------------------------------------------
diff --git a/streams-core/pom.xml b/streams-core/pom.xml
index 9546b5f..950687f 100644
--- a/streams-core/pom.xml
+++ b/streams-core/pom.xml
@@ -36,6 +36,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>javax.ws.rs</groupId>
+ <artifactId>jsr311-api</artifactId>
+ <version>1.1.1</version>
+ </dependency>
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-core/src/main/java/org/apache/streams/core/StreamsResource.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsResource.java b/streams-core/src/main/java/org/apache/streams/core/StreamsResource.java
new file mode 100644
index 0000000..4bd18e2
--- /dev/null
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamsResource.java
@@ -0,0 +1,17 @@
+package org.apache.streams.core;
+
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+
+/**
+ * Created by sblackmon on 11/20/14.
+ */
+public interface StreamsResource {
+
+ public Response json(HttpHeaders headers, String body);
+
+ public Response json_new_line(HttpHeaders headers, String body);
+
+ public Response json_meta(HttpHeaders headers, String body);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-runtimes/streams-runtime-dropwizard/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/pom.xml b/streams-runtimes/streams-runtime-dropwizard/pom.xml
index ab6e927..045fa86 100644
--- a/streams-runtimes/streams-runtime-dropwizard/pom.xml
+++ b/streams-runtimes/streams-runtime-dropwizard/pom.xml
@@ -138,11 +138,6 @@
<artifactId>streams-runtime-local</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-persist-console</artifactId>
- <version>${project.version}</version>
- </dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
new file mode 100644
index 0000000..56d57b0
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
@@ -0,0 +1,217 @@
+package org.apache.streams.dropwizard;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Queues;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResource;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.ComponentUtils;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Resource;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
+
+@Resource
+@Path("/streams/webhooks")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+public class GenericWebhookResource implements StreamsProvider, StreamsResource {
+
+ public GenericWebhookResource() {
+ }
+
+ private static final Logger log = LoggerFactory
+ .getLogger(GenericWebhookResource.class);
+
+ private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ protected Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>();
+
+ protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private static Pattern newLinePattern = Pattern.compile("(\\r\\n?|\\n)", Pattern.MULTILINE);
+
+ @POST
+ @Path("json")
+ public Response json(@Context HttpHeaders headers,
+ String body) {
+
+ ObjectNode response = mapper.createObjectNode();
+
+ StreamsDatum datum = new StreamsDatum(body);
+
+ lock.writeLock().lock();
+ ComponentUtils.offerUntilSuccess(datum, providerQueue);
+ lock.writeLock().unlock();
+
+ Boolean success = true;
+
+ response.put("success", success);
+
+ return Response.status(200).entity(response).build();
+
+ }
+
+ @POST
+ @Path("json_new_line")
+ public Response json_new_line(@Context HttpHeaders headers,
+ String body) {
+
+ ObjectNode response = mapper.createObjectNode();
+
+ if (body.equalsIgnoreCase("{}")) {
+
+ Boolean success = true;
+
+ response.put("success", success);
+
+ return Response.status(200).entity(response).build();
+ }
+
+ try {
+
+ for( String item : Splitter.on(newLinePattern).split(body)) {
+ StreamsDatum datum = new StreamsDatum(item);
+
+ lock.writeLock().lock();
+ ComponentUtils.offerUntilSuccess(datum, providerQueue);
+ lock.writeLock().unlock();
+
+ }
+
+ Boolean success = true;
+
+ response.put("success", success);
+
+ return Response.status(200).entity(response).build();
+
+ } catch (Exception e) {
+ log.warn(e.toString(), e);
+
+ Boolean success = false;
+
+ response.put("success", success);
+
+ return Response.status(500).entity(response).build();
+
+ }
+
+ }
+
+ @POST
+ @Path("json_meta")
+ public Response json_meta(@Context HttpHeaders headers,
+ String body) {
+
+ //log.debug(headers.toString(), headers);
+
+ //log.debug(body.toString(), body);
+
+ ObjectNode response = mapper.createObjectNode();
+
+ if (body.equalsIgnoreCase("{}")) {
+
+ Boolean success = true;
+
+ response.put("success", success);
+
+ return Response.status(200).entity(response).build();
+ }
+
+ try {
+
+ GenericWebhookData objectWrapper = mapper.readValue(body, GenericWebhookData.class);
+
+ for( ObjectNode item : objectWrapper.getData()) {
+
+ String json = mapper.writeValueAsString(item);
+
+ StreamsDatum datum = new StreamsDatum(json);
+
+ lock.writeLock().lock();
+ ComponentUtils.offerUntilSuccess(datum, providerQueue);
+ lock.writeLock().unlock();
+ }
+
+ Boolean success = true;
+
+ response.put("success", success);
+
+ return Response.status(200).entity(response).build();
+
+ } catch (Exception e) {
+ log.warn(e.toString(), e);
+ }
+
+ return Response.status(500).build();
+ }
+
+ public List<ObjectNode> getData(GenericWebhookData wrapper) {
+ return wrapper.getData();
+ }
+
+ @Override
+ public void startStream() {
+ return;
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+
+ StreamsResultSet current;
+
+ lock.writeLock().lock();
+ current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+ providerQueue.clear();
+ lock.writeLock().unlock();
+
+ return current;
+
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return null;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return true;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
new file mode 100644
index 0000000..4292900
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
@@ -0,0 +1,39 @@
+package org.apache.streams.dropwizard;
+
+import org.apache.streams.core.StreamBuilder;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.local.builders.LocalStreamBuilder;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.Map;
+
+/**
+ * Created by sblackmon on 11/20/14.
+ */
+public class StreamDropwizardBuilder extends LocalStreamBuilder implements StreamBuilder {
+
+ public StreamDropwizardBuilder() {
+ super();
+ }
+
+ public StreamDropwizardBuilder(Map<String, Object> streamConfig) {
+ super(streamConfig);
+ }
+
+ public StreamDropwizardBuilder(int maxQueueCapacity) {
+ super(maxQueueCapacity);
+ }
+
+ public StreamDropwizardBuilder(int maxQueueCapacity, Map<String, Object> streamConfig) {
+ super(maxQueueCapacity, streamConfig);
+ }
+
+ @Override
+ public StreamBuilder newPerpetualStream(String streamId, StreamsProvider provider) {
+ return super.newPerpetualStream(streamId, provider);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
index 733b078..67d0446 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
@@ -6,7 +6,9 @@ import com.fasterxml.jackson.datatype.guava.GuavaModule;
import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.hubspot.dropwizard.guice.GuiceBundle;
+import com.sun.jersey.api.core.ResourceConfig;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigRenderOptions;
@@ -14,22 +16,37 @@ import io.dropwizard.Application;
import io.dropwizard.jackson.GuavaExtrasModule;
import io.dropwizard.setup.Bootstrap;
import io.dropwizard.setup.Environment;
+import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.console.ConsolePersistWriter;
import org.apache.streams.core.StreamBuilder;
import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.core.StreamsProvider;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.local.builders.LocalStreamBuilder;
import org.apache.streams.pojo.json.Activity;
+import org.joda.time.DateTime;
+import org.reflections.Reflections;
+import org.reflections.util.ConfigurationBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import com.google.inject.Inject;
+
+import javax.annotation.Resource;
+import javax.ws.rs.Path;
+
public class StreamsApplication extends Application<StreamsDropwizardConfiguration> {
private static final Logger LOGGER = LoggerFactory
@@ -37,11 +54,11 @@ public class StreamsApplication extends Application<StreamsDropwizardConfigurati
private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
- private StreamBuilder builder;
+ protected StreamBuilder builder;
- private WebhookResource webhook;
+ private static StreamsConfiguration streamsConfiguration;
- private String broadcastURI;
+ private Set<StreamsProvider> resourceProviders = Sets.newConcurrentHashSet();
private Executor executor = Executors.newSingleThreadExecutor();
@@ -70,33 +87,63 @@ public class StreamsApplication extends Application<StreamsDropwizardConfigurati
@Override
public void run(StreamsDropwizardConfiguration streamsDropwizardConfiguration, Environment environment) throws Exception {
- webhook = new WebhookResource();
-
executor = Executors.newSingleThreadExecutor();
- executor.execute(new StreamsDropwizardRunner());
+ for( Class<?> resourceProviderClass : environment.jersey().getResourceConfig().getRootResourceClasses() ) {
+ StreamsProvider provider = (StreamsProvider)resourceProviderClass.newInstance();
+ if( StreamsProvider.class.isInstance(provider))
+ resourceProviders.add(provider);
+ }
+
+ streamsConfiguration = mapper.convertValue(streamsDropwizardConfiguration, StreamsConfiguration.class);
+
+ builder = setup(streamsConfiguration, resourceProviders);
+
+ executor.execute(new StreamsDropwizardRunner(builder, streamsConfiguration));
// wait for streams to start up
Thread.sleep(10000);
- //environment.jersey().register(webhook);
+ for (StreamsProvider resource : resourceProviders) {
+ environment.jersey().register(resource);
+ LOGGER.info("Added resource class: {}", resource);
+ }
+
+ }
+
+ public StreamBuilder setup(StreamsConfiguration streamsConfiguration, Set<StreamsProvider> resourceProviders) {
+
+ Map<String, Object> streamConfig = Maps.newHashMap();
+ streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 20 * 60 * 1000 * 1000);
+ if(! Strings.isNullOrEmpty(streamsConfiguration.getBroadcastURI()) ) streamConfig.put("broadcastURI", streamsConfiguration.getBroadcastURI());
+ StreamBuilder builder = new StreamDropwizardBuilder(1000, streamConfig);
+ List<String> providers = new ArrayList<>();
+ for( StreamsProvider provider: resourceProviders) {
+ String providerId = provider.getClass().getSimpleName();
+ builder.newPerpetualStream(providerId, provider);
+ providers.add(providerId);
+ }
+
+ return builder;
}
private class StreamsDropwizardRunner implements Runnable {
+ private StreamsConfiguration streamsConfiguration;
+
+ private StreamBuilder builder;
+
+ protected StreamsDropwizardRunner(StreamBuilder builder, StreamsConfiguration streamsConfiguration) {
+ this.streamsConfiguration = streamsConfiguration;
+ this.builder = builder;
+ }
+
@Override
public void run() {
- Map<String, Object> streamConfig = Maps.newHashMap();
- streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 20 * 60 * 1000 * 1000);
- if(! Strings.isNullOrEmpty(broadcastURI) ) streamConfig.put("broadcastURI", broadcastURI);
- builder = new LocalStreamBuilder(1000, streamConfig);
+ builder.start();
- // prepare stream components
- builder.newPerpetualStream("webhooks", webhook);
-
- builder.addStreamsPersistWriter("console", new ConsolePersistWriter(), 1, "webhooks");
}
}
@@ -104,9 +151,8 @@ public class StreamsApplication extends Application<StreamsDropwizardConfigurati
public static void main(String[] args) throws Exception
{
- StreamsApplication application = new StreamsApplication();
- if( args.length == 1 ) application.broadcastURI = args[0];
- application.run(args);
+ new StreamsApplication().run(args);
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
index 4264dbb..f5cd020 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
@@ -3,6 +3,7 @@ package org.apache.streams.dropwizard;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
+import com.google.inject.Singleton;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigRenderOptions;
@@ -18,37 +19,13 @@ public class StreamsDropwizardModule extends AbstractModule {
@Override
protected void configure() {
- // anything you'd like to configure
+ requestStaticInjection(StreamsConfiguration.class);
}
@Provides
- public StreamsConfiguration providesStreamsConfiguration(StreamsDropwizardConfiguration configuration) {
+ @Singleton
+ public StreamsConfiguration providesStreamsConfiguration() {
return StreamsConfigurator.detectConfiguration();
}
-// private StreamsDropwizardConfiguration reconfigure(StreamsDropwizardConfiguration streamsConfiguration) {
-//
-// // config from dropwizard
-// Config configDropwizard = null;
-// try {
-// configDropwizard = ConfigFactory.parseString(mapper.writeValueAsString(streamsConfiguration));
-// } catch (JsonProcessingException e) {
-// e.printStackTrace();
-// LOGGER.error("Invalid Configuration: " + streamsConfiguration);
-// }
-//
-// Config combinedConfig = configTypesafe.withFallback(configDropwizard);
-// String combinedConfigJson = combinedConfig.root().render(ConfigRenderOptions.concise());
-//
-// StreamsDropwizardConfiguration combinedDropwizardConfig = null;
-// try {
-// combinedDropwizardConfig = mapper.readValue(combinedConfigJson, StreamsDropwizardConfiguration.class);
-// } catch (IOException e) {
-// e.printStackTrace();
-// LOGGER.error("Invalid Configuration after merge: " + streamsConfiguration);
-// }
-//
-// return combinedDropwizardConfig;
-//
-// }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/WebhookResource.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/WebhookResource.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/WebhookResource.java
deleted file mode 100644
index 1f80c5c..0000000
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/WebhookResource.java
+++ /dev/null
@@ -1,222 +0,0 @@
-package org.apache.streams.dropwizard;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Queues;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.ComponentUtils;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Resource;
-import javax.inject.Inject;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.math.BigInteger;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
-
-@Resource
-@Path("/streams/webhooks")
-@Produces(MediaType.APPLICATION_JSON)
-@Consumes(MediaType.APPLICATION_JSON)
-public class WebhookResource implements StreamsProvider {
-
- public WebhookResource() {
- }
-
- private static final Logger log = LoggerFactory
- .getLogger(WebhookResource.class);
-
- private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
- protected Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<>();
-
- protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
- private static Pattern newLinePattern = Pattern.compile("(\\r\\n?|\\n)", Pattern.MULTILINE);
-
- @POST
- @Path("json")
- public Response json(@Context HttpHeaders headers,
- String body) {
-
- ObjectNode response = mapper.createObjectNode();
-
- StreamsDatum datum = new StreamsDatum(body);
-
- lock.writeLock().lock();
- ComponentUtils.offerUntilSuccess(datum, providerQueue);
- lock.writeLock().unlock();
-
- Boolean success = true;
-
- response.put("success", success);
-
- return Response.status(200).entity(response).build();
-
- }
-
- @POST
- @Path("json_new_line")
- public Response json_new_line(@Context HttpHeaders headers,
- String body) {
-
- ObjectNode response = mapper.createObjectNode();
-
- if (body.equalsIgnoreCase("{}")) {
-
- Boolean success = true;
-
- response.put("success", success);
-
- return Response.status(200).entity(response).build();
- }
-
- try {
-
- for( String item : Splitter.on('\n').split(body)) {
- StreamsDatum datum = new StreamsDatum(item);
-
- lock.writeLock().lock();
- ComponentUtils.offerUntilSuccess(datum, providerQueue);
- lock.writeLock().unlock();
-
- }
-
- Boolean success = true;
-
- response.put("success", success);
-
- return Response.status(200).entity(response).build();
-
- } catch (Exception e) {
- log.warn(e.toString(), e);
-
- Boolean success = false;
-
- response.put("success", success);
-
- return Response.status(500).entity(response).build();
-
- }
-
- }
-
-// @POST
-// @Path("json_meta")
-// public Response json_meta(@Context HttpHeaders headers,
-// String body) {
-//
-// //log.debug(headers.toString(), headers);
-//
-// //log.debug(body.toString(), body);
-//
-// ObjectNode response = mapper.createObjectNode();
-//
-// if (body.equalsIgnoreCase("{}")) {
-//
-// Boolean success = true;
-//
-// response.put("success", success);
-//
-// return Response.status(200).entity(response).build();
-// }
-//
-// try {
-//
-// ObjectNode objectWrapper = mapper.readValue(body, ObjectNode.class);
-//
-// for( ObjectNode item : objectWrapper.getData()) {
-//
-// String json = mapper.writeValueAsString(item);
-//
-// StreamsDatum datum = new StreamsDatum(json, item.getInteraction().getId(), item.getInteraction().getCreatedAt());
-//
-// lock.writeLock().lock();
-// ComponentUtils.offerUntilSuccess(datum, providerQueue);
-// lock.writeLock().unlock();
-// }
-//
-// log.info("interactionQueue: " + providerQueue.size());
-//
-// Boolean success = true;
-//
-// response.put("success", success);
-//
-// return Response.status(200).entity(response).build();
-//
-// } catch (Exception e) {
-// log.warn(e.toString(), e);
-// }
-//
-// return Response.status(500).build();
-// }
-
- @Override
- public void startStream() {
- return;
- }
-
- @Override
- public StreamsResultSet readCurrent() {
-
- StreamsResultSet current;
-
- lock.writeLock().lock();
- current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
- providerQueue.clear();
- lock.writeLock().unlock();
-
- return current;
-
- }
-
- @Override
- public StreamsResultSet readNew(BigInteger sequence) {
- return null;
- }
-
- @Override
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- return null;
- }
-
- @Override
- public boolean isRunning() {
- return false;
- }
-
- @Override
- public void prepare(Object configurationObject) {
-
- }
-
- @Override
- public void cleanUp() {
-
- }
-
- public void addDatum(StreamsDatum datum) {
- try {
- lock.readLock().lock();
- ComponentUtils.offerUntilSuccess(datum, providerQueue);
- } finally {
- lock.readLock().unlock();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dbda9ed7/streams-runtimes/streams-runtime-dropwizard/src/main/jsonschema/org/apache/streams/dropwizard/GenericWebhookData.json
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/jsonschema/org/apache/streams/dropwizard/GenericWebhookData.json b/streams-runtimes/streams-runtime-dropwizard/src/main/jsonschema/org/apache/streams/dropwizard/GenericWebhookData.json
new file mode 100644
index 0000000..53a26c7
--- /dev/null
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/jsonschema/org/apache/streams/dropwizard/GenericWebhookData.json
@@ -0,0 +1,30 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "javaType": "org.apache.streams.dropwizard.GenericWebhookData",
+ "properties": {
+ "id": {
+ "type": "string"
+ },
+ "hash": {
+ "type": "string"
+ },
+ "hash_type": {
+ "type": "string"
+ },
+ "count": {
+ "type": "long"
+ },
+ "delivered_at": {
+ "type": "string",
+ "format": "date-time"
+ },
+ "data": {
+ "type": "array",
+ "items": {
+ "type": "object",
+ "javaType": "com.fasterxml.jackson.databind.node.ObjectNode"
+ }
+ }
+ }
+}
\ No newline at end of file