You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/21 20:35:15 UTC
[8/9] incubator-streams git commit: more tests javadoc headers
refactor DatasiftPushProvider for compatibility
more tests
javadoc headers
refactor DatasiftPushProvider for compatibility
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/3d5f291a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/3d5f291a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/3d5f291a
Branch: refs/heads/STREAMS-222
Commit: 3d5f291a5f66f78d8728db0baf5ab6103ada42bf
Parents: 6c989cb
Author: sblackmon <sb...@apache.org>
Authored: Fri Nov 21 13:26:43 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Fri Nov 21 13:26:43 2014 -0600
----------------------------------------------------------------------
pom.xml | 7 +-
.../datasift/provider/DatasiftPushProvider.java | 3 +-
.../streams-runtime-dropwizard/pom.xml | 28 +++++--
.../dropwizard/GenericWebhookResource.java | 82 +++++++++++++-------
.../dropwizard/StreamDropwizardBuilder.java | 4 +-
.../streams/dropwizard/StreamsApplication.java | 19 ++++-
.../dropwizard/StreamsDropwizardModule.java | 5 +-
.../test/GenericWebhookResourceTest.java | 2 +-
.../src/test/resources/configuration.yml | 13 +---
9 files changed, 108 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 351ec68..317861f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,6 +77,7 @@
<surefire.plugin.version>2.17</surefire.plugin.version>
<failsafe.plugin.version>2.17</failsafe.plugin.version>
<slf4j.version>1.7.6</slf4j.version>
+ <hamcrest.version>1.3</hamcrest.version>
<logback.version>1.1.1</logback.version>
<commons-io.version>2.4</commons-io.version>
<commons-lang3.version>3.1</commons-lang3.version>
@@ -260,7 +261,11 @@
<artifactId>config</artifactId>
<version>${typesafe.config.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>${hamcrest.version}</version>
+ </dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
index a363cb1..cce5930 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
@@ -29,7 +29,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResource;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.datasift.Datasift;
import org.apache.streams.datasift.DatasiftConfiguration;
@@ -67,7 +66,7 @@ import java.util.regex.Pattern;
@Path("/streams/webhooks/datasift")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
-public class DatasiftPushProvider implements StreamsProvider, StreamsResource {
+public class DatasiftPushProvider implements StreamsProvider {
private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftPushProvider.class);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/pom.xml b/streams-runtimes/streams-runtime-dropwizard/pom.xml
index 045fa86..c18ba9d 100644
--- a/streams-runtimes/streams-runtime-dropwizard/pom.xml
+++ b/streams-runtimes/streams-runtime-dropwizard/pom.xml
@@ -69,6 +69,19 @@
</dependency>
<dependency>
+ <groupId>io.dropwizard</groupId>
+ <artifactId>dropwizard-metrics</artifactId>
+ <version>${dropwizard.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.dropwizard</groupId>
+ <artifactId>dropwizard-testing</artifactId>
+ <version>${dropwizard.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
@@ -152,12 +165,6 @@
<artifactId>log4j-over-slf4j</artifactId>
</dependency>
- <!-- This ensures slf4j-log4j12 is not packaged in implementations -->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <scope>provided</scope>
- </dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
@@ -238,6 +245,15 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ </plugin>
+ <!--<plugin>-->
+ <!--<groupId>org.apache.maven.plugins</groupId>-->
+ <!--<artifactId>maven-resources-plugin</artifactId>-->
+ <!--<resources>test.</resources>-->
+ <!--</plugin>-->
</plugins>
</build>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
index 56d57b0..8416361 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
@@ -6,7 +6,6 @@ import com.google.common.base.Splitter;
import com.google.common.collect.Queues;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResource;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.util.ComponentUtils;
@@ -31,11 +30,17 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
+/**
+ * GenericWebhookResource provides basic webhook connectivity.
+ *
+ * Add processors / persistWriters that read from "GenericWebhookResource" to
+ * consume data posted to streams.
+ */
@Resource
@Path("/streams/webhooks")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
-public class GenericWebhookResource implements StreamsProvider, StreamsResource {
+public class GenericWebhookResource implements StreamsProvider {
public GenericWebhookResource() {
}
@@ -57,19 +62,35 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
String body) {
ObjectNode response = mapper.createObjectNode();
+ int responseCode = Response.Status.BAD_REQUEST.getStatusCode();
- StreamsDatum datum = new StreamsDatum(body);
+ try {
+ ObjectNode item = mapper.readValue(body, ObjectNode.class);
- lock.writeLock().lock();
- ComponentUtils.offerUntilSuccess(datum, providerQueue);
- lock.writeLock().unlock();
+ StreamsDatum datum = new StreamsDatum(body);
+
+ lock.writeLock().lock();
+ ComponentUtils.offerUntilSuccess(datum, providerQueue);
+ lock.writeLock().unlock();
- Boolean success = true;
+ Boolean success = true;
- response.put("success", success);
+ response.put("success", success);
- return Response.status(200).entity(response).build();
+ responseCode = Response.Status.OK.getStatusCode();
+ } catch (Exception e) {
+ log.warn(e.toString(), e);
+
+ Boolean success = false;
+
+ response.put("success", success);
+ responseCode = Response.Status.BAD_REQUEST.getStatusCode();
+
+ } finally {
+ return Response.status(responseCode).entity(response).build();
+
+ }
}
@POST
@@ -78,19 +99,22 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
String body) {
ObjectNode response = mapper.createObjectNode();
+ int responseCode = Response.Status.BAD_REQUEST.getStatusCode();
if (body.equalsIgnoreCase("{}")) {
Boolean success = true;
response.put("success", success);
-
- return Response.status(200).entity(response).build();
+ responseCode = Response.Status.OK.getStatusCode();
+ return Response.status(responseCode).entity(response).build();
}
try {
- for( String item : Splitter.on(newLinePattern).split(body)) {
+ for( String line : Splitter.on(newLinePattern).split(body)) {
+ ObjectNode item = mapper.readValue(line, ObjectNode.class);
+
StreamsDatum datum = new StreamsDatum(item);
lock.writeLock().lock();
@@ -102,8 +126,7 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
Boolean success = true;
response.put("success", success);
-
- return Response.status(200).entity(response).build();
+ responseCode = Response.Status.OK.getStatusCode();
} catch (Exception e) {
log.warn(e.toString(), e);
@@ -111,8 +134,10 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
Boolean success = false;
response.put("success", success);
+ responseCode = Response.Status.BAD_REQUEST.getStatusCode();
- return Response.status(500).entity(response).build();
+ } finally {
+ return Response.status(responseCode).entity(response).build();
}
@@ -123,19 +148,17 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
public Response json_meta(@Context HttpHeaders headers,
String body) {
- //log.debug(headers.toString(), headers);
-
- //log.debug(body.toString(), body);
-
ObjectNode response = mapper.createObjectNode();
+ int responseCode = Response.Status.BAD_REQUEST.getStatusCode();
if (body.equalsIgnoreCase("{}")) {
Boolean success = true;
response.put("success", success);
+ responseCode = Response.Status.OK.getStatusCode();
- return Response.status(200).entity(response).build();
+ return Response.status(responseCode).entity(response).build();
}
try {
@@ -144,9 +167,7 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
for( ObjectNode item : objectWrapper.getData()) {
- String json = mapper.writeValueAsString(item);
-
- StreamsDatum datum = new StreamsDatum(json);
+ StreamsDatum datum = new StreamsDatum(item);
lock.writeLock().lock();
ComponentUtils.offerUntilSuccess(datum, providerQueue);
@@ -156,18 +177,19 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
Boolean success = true;
response.put("success", success);
-
- return Response.status(200).entity(response).build();
+ responseCode = Response.Status.OK.getStatusCode();
} catch (Exception e) {
log.warn(e.toString(), e);
- }
- return Response.status(500).build();
- }
+ Boolean success = false;
+
+ response.put("success", success);
+ responseCode = Response.Status.BAD_REQUEST.getStatusCode();
+ } finally {
+ return Response.status(responseCode).entity(response).build();
+ }
- public List<ObjectNode> getData(GenericWebhookData wrapper) {
- return wrapper.getData();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
index 4292900..524009a 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
@@ -11,7 +11,9 @@ import java.math.BigInteger;
import java.util.Map;
/**
- * Created by sblackmon on 11/20/14.
+ * StreamDropwizardBuilder is currently a light wrapper around LocalStreamBuilder
+ *
+ * It's a seperate class because they will almost certainly deviate going forward
*/
public class StreamDropwizardBuilder extends LocalStreamBuilder implements StreamBuilder {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
index 67d0446..fce9852 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
@@ -1,5 +1,7 @@
package org.apache.streams.dropwizard;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.guava.GuavaModule;
@@ -14,6 +16,7 @@ import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigRenderOptions;
import io.dropwizard.Application;
import io.dropwizard.jackson.GuavaExtrasModule;
+import io.dropwizard.metrics.MetricsFactory;
import io.dropwizard.setup.Bootstrap;
import io.dropwizard.setup.Environment;
import org.apache.streams.config.StreamsConfiguration;
@@ -47,12 +50,19 @@ import com.google.inject.Inject;
import javax.annotation.Resource;
import javax.ws.rs.Path;
+/**
+ * Entry point to a dropwizard streams application
+ *
+ * It will start up a stream in the local runtime, as well as bind any
+ * StreamsProvider on the classpath with a @Resource annotation.
+ *
+ */
public class StreamsApplication extends Application<StreamsDropwizardConfiguration> {
private static final Logger LOGGER = LoggerFactory
.getLogger(StreamsApplication.class);
- private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ protected static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
protected StreamBuilder builder;
@@ -95,6 +105,10 @@ public class StreamsApplication extends Application<StreamsDropwizardConfigurati
resourceProviders.add(provider);
}
+ MetricRegistry metrics = new MetricRegistry();
+ MetricsFactory mfac = streamsDropwizardConfiguration.getMetricsFactory();
+ mfac.configure(environment.lifecycle(), metrics);
+
streamsConfiguration = mapper.convertValue(streamsDropwizardConfiguration, StreamsConfiguration.class);
builder = setup(streamsConfiguration, resourceProviders);
@@ -115,7 +129,8 @@ public class StreamsApplication extends Application<StreamsDropwizardConfigurati
Map<String, Object> streamConfig = Maps.newHashMap();
streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 20 * 60 * 1000 * 1000);
- if(! Strings.isNullOrEmpty(streamsConfiguration.getBroadcastURI()) ) streamConfig.put("broadcastURI", streamsConfiguration.getBroadcastURI());
+ //if(! Strings.isNullOrEmpty(streamsConfiguration.getBroadcastURI()) ) streamConfig.put("broadcastURI", streamsConfiguration.getBroadcastURI());
+ streamConfig.put("monitoring_broadcast_interval_ms", -1);
StreamBuilder builder = new StreamDropwizardBuilder(1000, streamConfig);
List<String> providers = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
index f5cd020..1ba07c0 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
@@ -13,7 +13,10 @@ import org.apache.streams.config.StreamsConfigurator;
import java.io.IOException;
/**
- * Created by sblackmon on 11/18/14.
+ * This class exists because dropwizard-guice requires at least
+ * one module to run
+ *
+ * Do not expect @Inject StreamsConfiguration to work at the moment.
*/
public class StreamsDropwizardModule extends AbstractModule {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java b/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java
index d1d02ac..db9133b 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java
@@ -29,7 +29,7 @@ import java.util.List;
import static org.mockito.Mockito.*;
/**
- * Created by sblackmon on 11/21/14.
+ * Tests {@link: org.apache.streams.dropwizard.GenericWebhookResource}
*/
public class GenericWebhookResourceTest {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml b/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml
index e2943cd..778f50a 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml
+++ b/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml
@@ -1,5 +1,5 @@
template: Hello, %s!
-defaultName: datasift
+defaultName: streams
server:
type: simple
@@ -7,7 +7,7 @@ server:
adminContextPath: /admin
connector:
type: http
- port: 8000
+ port: 8003
logging:
level: DEBUG
@@ -16,12 +16,3 @@ logging:
threshold: ALL
target: stdout
-elasticsearch:
- hosts:
- - "localhost"
- port: 9300
- clusterName: elasticsearch
- index: datasift_webhook
- type: activity
- batchSize: 100
-