You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/21 20:35:15 UTC

[8/9] incubator-streams git commit: more tests javadoc headers refactor DatasiftPushProvider for compatibility

more tests
javadoc headers
refactor DatasiftPushProvider for compatibility


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/3d5f291a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/3d5f291a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/3d5f291a

Branch: refs/heads/STREAMS-222
Commit: 3d5f291a5f66f78d8728db0baf5ab6103ada42bf
Parents: 6c989cb
Author: sblackmon <sb...@apache.org>
Authored: Fri Nov 21 13:26:43 2014 -0600
Committer: sblackmon <sb...@apache.org>
Committed: Fri Nov 21 13:26:43 2014 -0600

----------------------------------------------------------------------
 pom.xml                                         |  7 +-
 .../datasift/provider/DatasiftPushProvider.java |  3 +-
 .../streams-runtime-dropwizard/pom.xml          | 28 +++++--
 .../dropwizard/GenericWebhookResource.java      | 82 +++++++++++++-------
 .../dropwizard/StreamDropwizardBuilder.java     |  4 +-
 .../streams/dropwizard/StreamsApplication.java  | 19 ++++-
 .../dropwizard/StreamsDropwizardModule.java     |  5 +-
 .../test/GenericWebhookResourceTest.java        |  2 +-
 .../src/test/resources/configuration.yml        | 13 +---
 9 files changed, 108 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 351ec68..317861f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,6 +77,7 @@
         <surefire.plugin.version>2.17</surefire.plugin.version>
         <failsafe.plugin.version>2.17</failsafe.plugin.version>
         <slf4j.version>1.7.6</slf4j.version>
+        <hamcrest.version>1.3</hamcrest.version>
         <logback.version>1.1.1</logback.version>
         <commons-io.version>2.4</commons-io.version>
         <commons-lang3.version>3.1</commons-lang3.version>
@@ -260,7 +261,11 @@
                 <artifactId>config</artifactId>
                 <version>${typesafe.config.version}</version>
             </dependency>
-
+            <dependency>
+                <groupId>org.hamcrest</groupId>
+                <artifactId>hamcrest-all</artifactId>
+                <version>${hamcrest.version}</version>
+            </dependency>
             <dependency>
                 <groupId>junit</groupId>
                 <artifactId>junit</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
index a363cb1..cce5930 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftPushProvider.java
@@ -29,7 +29,6 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResource;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.datasift.Datasift;
 import org.apache.streams.datasift.DatasiftConfiguration;
@@ -67,7 +66,7 @@ import java.util.regex.Pattern;
 @Path("/streams/webhooks/datasift")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
-public class DatasiftPushProvider implements StreamsProvider, StreamsResource {
+public class DatasiftPushProvider implements StreamsProvider {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftPushProvider.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/pom.xml b/streams-runtimes/streams-runtime-dropwizard/pom.xml
index 045fa86..c18ba9d 100644
--- a/streams-runtimes/streams-runtime-dropwizard/pom.xml
+++ b/streams-runtimes/streams-runtime-dropwizard/pom.xml
@@ -69,6 +69,19 @@
         </dependency>
 
         <dependency>
+            <groupId>io.dropwizard</groupId>
+            <artifactId>dropwizard-metrics</artifactId>
+            <version>${dropwizard.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.dropwizard</groupId>
+            <artifactId>dropwizard-testing</artifactId>
+            <version>${dropwizard.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-annotations</artifactId>
             <version>${jackson.version}</version>
@@ -152,12 +165,6 @@
             <artifactId>log4j-over-slf4j</artifactId>
         </dependency>
 
-        <!-- This ensures slf4j-log4j12 is not packaged in implementations -->
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <scope>provided</scope>
-        </dependency>
         <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-all</artifactId>
@@ -238,6 +245,15 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+            </plugin>
+            <!--<plugin>-->
+                <!--<groupId>org.apache.maven.plugins</groupId>-->
+                <!--<artifactId>maven-resources-plugin</artifactId>-->
+                <!--<resources>test.</resources>-->
+            <!--</plugin>-->
         </plugins>
     </build>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
index 56d57b0..8416361 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/GenericWebhookResource.java
@@ -6,7 +6,6 @@ import com.google.common.base.Splitter;
 import com.google.common.collect.Queues;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResource;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.util.ComponentUtils;
@@ -31,11 +30,17 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Pattern;
 
+/**
+ * GenericWebhookResource provides basic webhook connectivity.
+ *
+ * Add processors / persistWriters that read from "GenericWebhookResource" to
+ * consume data posted to streams.
+ */
 @Resource
 @Path("/streams/webhooks")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
-public class GenericWebhookResource implements StreamsProvider, StreamsResource {
+public class GenericWebhookResource implements StreamsProvider {
 
     public GenericWebhookResource() {
     }
@@ -57,19 +62,35 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
                                   String body) {
 
         ObjectNode response = mapper.createObjectNode();
+        int responseCode = Response.Status.BAD_REQUEST.getStatusCode();
 
-        StreamsDatum datum = new StreamsDatum(body);
+        try {
+            ObjectNode item = mapper.readValue(body, ObjectNode.class);
 
-        lock.writeLock().lock();
-        ComponentUtils.offerUntilSuccess(datum, providerQueue);
-        lock.writeLock().unlock();
+            StreamsDatum datum = new StreamsDatum(body);
+
+            lock.writeLock().lock();
+            ComponentUtils.offerUntilSuccess(datum, providerQueue);
+            lock.writeLock().unlock();
 
-        Boolean success = true;
+            Boolean success = true;
 
-        response.put("success", success);
+            response.put("success", success);
 
-        return Response.status(200).entity(response).build();
+            responseCode = Response.Status.OK.getStatusCode();
 
+        } catch (Exception e) {
+            log.warn(e.toString(), e);
+
+            Boolean success = false;
+
+            response.put("success", success);
+            responseCode = Response.Status.BAD_REQUEST.getStatusCode();
+
+        } finally {
+            return Response.status(responseCode).entity(response).build();
+
+        }
     }
 
     @POST
@@ -78,19 +99,22 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
                                            String body) {
 
         ObjectNode response = mapper.createObjectNode();
+        int responseCode = Response.Status.BAD_REQUEST.getStatusCode();
 
         if (body.equalsIgnoreCase("{}")) {
 
             Boolean success = true;
 
             response.put("success", success);
-
-            return Response.status(200).entity(response).build();
+            responseCode = Response.Status.OK.getStatusCode();
+            return Response.status(responseCode).entity(response).build();
         }
 
         try {
 
-            for( String item : Splitter.on(newLinePattern).split(body)) {
+            for( String line : Splitter.on(newLinePattern).split(body)) {
+                ObjectNode item = mapper.readValue(line, ObjectNode.class);
+
                 StreamsDatum datum = new StreamsDatum(item);
 
                 lock.writeLock().lock();
@@ -102,8 +126,7 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
             Boolean success = true;
 
             response.put("success", success);
-
-            return Response.status(200).entity(response).build();
+            responseCode = Response.Status.OK.getStatusCode();
 
         } catch (Exception e) {
             log.warn(e.toString(), e);
@@ -111,8 +134,10 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
             Boolean success = false;
 
             response.put("success", success);
+            responseCode = Response.Status.BAD_REQUEST.getStatusCode();
 
-            return Response.status(500).entity(response).build();
+        } finally {
+            return Response.status(responseCode).entity(response).build();
 
         }
 
@@ -123,19 +148,17 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
     public Response json_meta(@Context HttpHeaders headers,
                                        String body) {
 
-        //log.debug(headers.toString(), headers);
-
-        //log.debug(body.toString(), body);
-
         ObjectNode response = mapper.createObjectNode();
+        int responseCode = Response.Status.BAD_REQUEST.getStatusCode();
 
         if (body.equalsIgnoreCase("{}")) {
 
             Boolean success = true;
 
             response.put("success", success);
+            responseCode = Response.Status.OK.getStatusCode();
 
-            return Response.status(200).entity(response).build();
+            return Response.status(responseCode).entity(response).build();
         }
 
         try {
@@ -144,9 +167,7 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
 
             for( ObjectNode item : objectWrapper.getData()) {
 
-                String json = mapper.writeValueAsString(item);
-
-                StreamsDatum datum = new StreamsDatum(json);
+                StreamsDatum datum = new StreamsDatum(item);
 
                 lock.writeLock().lock();
                 ComponentUtils.offerUntilSuccess(datum, providerQueue);
@@ -156,18 +177,19 @@ public class GenericWebhookResource implements StreamsProvider, StreamsResource
             Boolean success = true;
 
             response.put("success", success);
-
-            return Response.status(200).entity(response).build();
+            responseCode = Response.Status.OK.getStatusCode();
 
         } catch (Exception e) {
             log.warn(e.toString(), e);
-        }
 
-        return Response.status(500).build();
-    }
+            Boolean success = false;
+
+            response.put("success", success);
+            responseCode = Response.Status.BAD_REQUEST.getStatusCode();
+        } finally {
+            return Response.status(responseCode).entity(response).build();
+        }
 
-    public List<ObjectNode> getData(GenericWebhookData wrapper) {
-        return wrapper.getData();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
index 4292900..524009a 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamDropwizardBuilder.java
@@ -11,7 +11,9 @@ import java.math.BigInteger;
 import java.util.Map;
 
 /**
- * Created by sblackmon on 11/20/14.
+ * StreamDropwizardBuilder is currently a light wrapper around LocalStreamBuilder
+ *
+ * It's a seperate class because they will almost certainly deviate going forward
  */
 public class StreamDropwizardBuilder extends LocalStreamBuilder implements StreamBuilder {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
index 67d0446..fce9852 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsApplication.java
@@ -1,5 +1,7 @@
 package org.apache.streams.dropwizard;
 
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.MetricRegistry;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.datatype.guava.GuavaModule;
@@ -14,6 +16,7 @@ import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigRenderOptions;
 import io.dropwizard.Application;
 import io.dropwizard.jackson.GuavaExtrasModule;
+import io.dropwizard.metrics.MetricsFactory;
 import io.dropwizard.setup.Bootstrap;
 import io.dropwizard.setup.Environment;
 import org.apache.streams.config.StreamsConfiguration;
@@ -47,12 +50,19 @@ import com.google.inject.Inject;
 import javax.annotation.Resource;
 import javax.ws.rs.Path;
 
+/**
+ * Entry point to a dropwizard streams application
+ *
+ * It will start up a stream in the local runtime, as well as bind any
+ * StreamsProvider on the classpath with a @Resource annotation.
+ *
+ */
 public class StreamsApplication extends Application<StreamsDropwizardConfiguration> {
 
     private static final Logger LOGGER = LoggerFactory
 			.getLogger(StreamsApplication.class);
 
-    private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+    protected static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
     protected StreamBuilder builder;
 
@@ -95,6 +105,10 @@ public class StreamsApplication extends Application<StreamsDropwizardConfigurati
                 resourceProviders.add(provider);
         }
 
+        MetricRegistry metrics = new MetricRegistry();
+        MetricsFactory mfac = streamsDropwizardConfiguration.getMetricsFactory();
+        mfac.configure(environment.lifecycle(), metrics);
+
         streamsConfiguration = mapper.convertValue(streamsDropwizardConfiguration, StreamsConfiguration.class);
 
         builder = setup(streamsConfiguration, resourceProviders);
@@ -115,7 +129,8 @@ public class StreamsApplication extends Application<StreamsDropwizardConfigurati
 
         Map<String, Object> streamConfig = Maps.newHashMap();
         streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 20 * 60 * 1000 * 1000);
-        if(! Strings.isNullOrEmpty(streamsConfiguration.getBroadcastURI()) ) streamConfig.put("broadcastURI", streamsConfiguration.getBroadcastURI());
+        //if(! Strings.isNullOrEmpty(streamsConfiguration.getBroadcastURI()) ) streamConfig.put("broadcastURI", streamsConfiguration.getBroadcastURI());
+        streamConfig.put("monitoring_broadcast_interval_ms", -1);
         StreamBuilder builder = new StreamDropwizardBuilder(1000, streamConfig);
 
         List<String> providers = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
index f5cd020..1ba07c0 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/main/java/org/apache/streams/dropwizard/StreamsDropwizardModule.java
@@ -13,7 +13,10 @@ import org.apache.streams.config.StreamsConfigurator;
 import java.io.IOException;
 
 /**
- * Created by sblackmon on 11/18/14.
+ * This class exists because dropwizard-guice requires at least
+ * one module to run
+ *
+ * Do not expect @Inject StreamsConfiguration to work at the moment.
  */
 public class StreamsDropwizardModule extends AbstractModule {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java b/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java
index d1d02ac..db9133b 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java
+++ b/streams-runtimes/streams-runtime-dropwizard/src/test/java/org/apache/streams/dropwizard/test/GenericWebhookResourceTest.java
@@ -29,7 +29,7 @@ import java.util.List;
 import static org.mockito.Mockito.*;
 
 /**
- * Created by sblackmon on 11/21/14.
+ * Tests {@link: org.apache.streams.dropwizard.GenericWebhookResource}
  */
 public class GenericWebhookResourceTest {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3d5f291a/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml b/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml
index e2943cd..778f50a 100644
--- a/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml
+++ b/streams-runtimes/streams-runtime-dropwizard/src/test/resources/configuration.yml
@@ -1,5 +1,5 @@
 template: Hello, %s!
-defaultName: datasift
+defaultName: streams
 
 server:
   type: simple
@@ -7,7 +7,7 @@ server:
   adminContextPath: /admin
   connector:
     type: http
-    port: 8000
+    port: 8003
 
 logging:
   level: DEBUG
@@ -16,12 +16,3 @@ logging:
       threshold: ALL
       target: stdout
 
-elasticsearch:
-  hosts:
-    - "localhost"
-  port: 9300
-  clusterName: elasticsearch
-  index: datasift_webhook
-  type: activity
-  batchSize: 100
-