You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/05/27 19:52:04 UTC

[6/7] git commit: Added build profile for tests, so tests only build and run in java 1.7 or higher

Added build profile for tests, so tests only build and run in java 1.7 or higher


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/48b98844
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/48b98844
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/48b98844

Branch: refs/heads/master
Commit: 48b98844c4c13d79d2330f26e2a0e85612cca011
Parents: a20b2e1
Author: rebanks <re...@w2odigital.com>
Authored: Tue May 27 11:37:34 2014 -0500
Committer: rebanks <re...@w2odigital.com>
Committed: Tue May 27 11:37:34 2014 -0500

----------------------------------------------------------------------
 .../streams-provider-datasift/pom.xml           |  31 +++++
 .../provider/DatasiftEventProcessor.java        |   4 +-
 .../com/datasift/test/DatasiftSerDeTest.java    |  63 ---------
 .../provider/DatasiftStreamProviderTest.java    | 127 -------------------
 .../datasift/provider/ErrorHandlerTest.java     |  23 ----
 .../datasift/provider/SubscriptionTest.java     |  41 ------
 .../com/datasift/test/DatasiftSerDeTest.java    |  63 +++++++++
 .../provider/DatasiftStreamProviderTest.java    | 126 ++++++++++++++++++
 .../datasift/provider/ErrorHandlerTest.java     |  22 ++++
 .../datasift/provider/SubscriptionTest.java     |  40 ++++++
 10 files changed, 284 insertions(+), 256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48b98844/streams-contrib/streams-provider-datasift/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/pom.xml b/streams-contrib/streams-provider-datasift/pom.xml
index 187ea94..5ff60b7 100644
--- a/streams-contrib/streams-provider-datasift/pom.xml
+++ b/streams-contrib/streams-provider-datasift/pom.xml
@@ -129,4 +129,35 @@
             </plugin>
         </plugins>
     </build>
+    <profiles>
+        <profile>
+            <id>java7</id>
+            <activation>
+                <jdk>[1.7,]</jdk>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.codehaus.mojo</groupId>
+                        <artifactId>build-helper-maven-plugin</artifactId>
+                        <version>1.8</version>
+                        <executions>
+                            <execution>
+                                <id>add-test-source</id>
+                                <phase>generate-sources</phase>
+                                <goals>
+                                    <goal>add-test-source</goal>
+                                </goals>
+                                <configuration>
+                                    <sources>
+                                        <source>src/test/java17</source>
+                                    </sources>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48b98844/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
index 942e137..09d618c 100644
--- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
+++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftEventProcessor.java
@@ -59,14 +59,14 @@ public class DatasiftEventProcessor implements StreamsProcessor {
             Datasift datasift = mapper.convertValue(item, Datasift.class);
             result.add(this.converter.convert(datasift));
         } catch (Exception e) {
-            e.printStackTrace();
+            LOGGER.error("Exception converting Datasift Interaction to "+this.outClass.getName()+ " : {}", e);
         }
         return result;
     }
 
     @Override
     public void prepare(Object configurationObject) {
-        this.mapper = new StreamsJacksonMapper();
+        this.mapper = StreamsJacksonMapper.getInstance();
         this.datasiftInteractionActivitySerializer = new DatasiftActivitySerializer();
         if(this.outClass.equals(Activity.class)) {
             this.converter = new ActivityConverter();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48b98844/streams-contrib/streams-provider-datasift/src/test/java/com/datasift/test/DatasiftSerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java/com/datasift/test/DatasiftSerDeTest.java b/streams-contrib/streams-provider-datasift/src/test/java/com/datasift/test/DatasiftSerDeTest.java
deleted file mode 100644
index 382a40b..0000000
--- a/streams-contrib/streams-provider-datasift/src/test/java/com/datasift/test/DatasiftSerDeTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package com.datasift.test;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.streams.datasift.Datasift;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-/**
- * Created with IntelliJ IDEA.
- * User: sblackmon
- * Date: 8/20/13
- * Time: 5:57 PM
- * To change this template use File | Settings | File Templates.
- */
-public class DatasiftSerDeTest {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftSerDeTest.class);
-
-    private ObjectMapper mapper = new ObjectMapper();
-
-    @Ignore
-    @Test
-    public void Tests()
-    {
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
-
-        InputStream is = DatasiftSerDeTest.class.getResourceAsStream("/part-r-00000.json");
-        InputStreamReader isr = new InputStreamReader(is);
-        BufferedReader br = new BufferedReader(isr);
-
-        try {
-            while (br.ready()) {
-                String line = br.readLine();
-                LOGGER.debug(line);
-
-                Datasift ser = mapper.readValue(line, Datasift.class);
-
-                String de = mapper.writeValueAsString(ser);
-
-                LOGGER.debug(de);
-
-                Datasift serde = mapper.readValue(de, Datasift.class);
-
-                Assert.assertEquals(ser, serde);
-
-                LOGGER.debug(mapper.writeValueAsString(serde));
-            }
-        } catch( Exception e ) {
-            e.printStackTrace();
-            Assert.fail();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48b98844/streams-contrib/streams-provider-datasift/src/test/java/org/apache/streams/datasift/provider/DatasiftStreamProviderTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java/org/apache/streams/datasift/provider/DatasiftStreamProviderTest.java b/streams-contrib/streams-provider-datasift/src/test/java/org/apache/streams/datasift/provider/DatasiftStreamProviderTest.java
deleted file mode 100644
index 1076813..0000000
--- a/streams-contrib/streams-provider-datasift/src/test/java/org/apache/streams/datasift/provider/DatasiftStreamProviderTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-package org.apache.streams.datasift.provider;
-
-import com.datasift.client.DataSiftClient;
-import com.datasift.client.stream.StreamEventListener;
-import com.datasift.client.stream.StreamingData;
-import com.google.common.collect.Lists;
-import org.apache.streams.datasift.DatasiftConfiguration;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.times;
-
-/**
- * Requires Java version 1.7!
- */
-public class DatasiftStreamProviderTest {
-
-    private static final String HASH1 = "fake1";
-    private static final String HASH2 = "fake2";
-    private static final String HASH3 = "fake3";
-
-    @Test @Ignore
-    public void startStreamForHash() {
-        DatasiftStreamProvider.DeleteHandler handler = new DatasiftStreamProvider.DeleteHandler();
-        final List<DataSiftClient> clientList = Lists.newLinkedList();
-        DatasiftStreamProvider provider = createStubbedProvider(clientList, handler);
-        provider.prepare(null);
-        provider.startStreamForHash(HASH1);
-        assertEquals(1, clientList.size());
-        provider.startStreamForHash(HASH2);
-        assertEquals(2, clientList.size());
-        provider.startStreamForHash(HASH3);
-    }
-
-    @Test @Ignore
-    public void testStartStream() {
-        DatasiftStreamProvider.DeleteHandler handler = new DatasiftStreamProvider.DeleteHandler();
-        final List<DataSiftClient> clientList = Lists.newLinkedList();
-        DatasiftStreamProvider provider = createStubbedProvider(clientList, handler);
-        provider.prepare(null);
-        provider.startStream();
-        assertEquals(3, clientList.size());
-    }
-
-    @Test @Ignore
-    public void testShutDownStream() {
-        DatasiftStreamProvider.DeleteHandler handler = new DatasiftStreamProvider.DeleteHandler();
-        final List<DataSiftClient> clientList = Lists.newLinkedList();
-        DatasiftStreamProvider provider = createStubbedProvider(clientList, handler);
-        provider.prepare(null);
-        provider.startStream();
-        assertEquals(3, clientList.size());
-        int shutDownCount = 0;
-        DataSiftClient client = clientList.get(0);
-        provider.shutDownStream(HASH1);
-        Mockito.verify(client, times(1)).shutdown();
-        client = clientList.get(1);
-        Mockito.verify(client, times(0)).shutdown();
-        client = clientList.get(2);
-        Mockito.verify(client, times(0)).shutdown();
-
-        provider.shutDownStream(HASH3);
-        Mockito.verify(client, times(1)).shutdown();
-        client = clientList.get(1);
-        Mockito.verify(client, times(0)).shutdown();
-        client = clientList.get(2);
-        Mockito.verify(client, times(1)).shutdown();
-    }
-
-    @Test @Ignore
-    public void testStartAlreadyInprogressStream() {
-        DatasiftStreamProvider.DeleteHandler handler = new DatasiftStreamProvider.DeleteHandler();
-        final List<DataSiftClient> clientList = Lists.newLinkedList();
-        DatasiftStreamProvider provider = createStubbedProvider(clientList, handler);
-        provider.prepare(null);
-        provider.startStream();
-        assertEquals(3, clientList.size());
-        int shutDownCount = 0;
-        DataSiftClient client = clientList.get(0);
-        provider.startStreamForHash(HASH1);
-        assertEquals(4, clientList.size());
-        Mockito.verify(client, times(1)).shutdown();
-        client = clientList.get(1);
-        Mockito.verify(client, times(0)).shutdown();
-        client = clientList.get(2);
-        Mockito.verify(client, times(0)).shutdown();
-        client = clientList.get(3);
-        Mockito.verify(client, times(0)).shutdown();
-    }
-
-
-
-
-    private DatasiftStreamProvider createStubbedProvider(final List<DataSiftClient> clientList, StreamEventListener listener) {
-        DatasiftStreamProvider provider = new DatasiftStreamProvider(listener, getTestConfiguration()) {
-            @Override
-            protected DataSiftClient getNewClient(String userName, String apiKey) {
-                DataSiftClient client = Mockito.mock(DataSiftClient.class);
-                StreamingData mockData = Mockito.mock(StreamingData.class);
-                Mockito.when(client.liveStream()).thenReturn(mockData);
-                clientList.add(client);
-                return client;
-            }
-        };
-        return provider;
-    }
-
-    private DatasiftConfiguration getTestConfiguration() {
-        DatasiftConfiguration config = new DatasiftConfiguration();
-        config.setUserName("fakeName");
-        config.setApiKey("fakeApiKey");
-        List<String> streamHashes = Lists.newLinkedList();
-        streamHashes.add(HASH1);
-        streamHashes.add(HASH2);
-        streamHashes.add(HASH3);
-        config.setStreamHash(streamHashes);
-        return config;
-    }
-
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48b98844/streams-contrib/streams-provider-datasift/src/test/java/org/apache/streams/datasift/provider/ErrorHandlerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java/org/apache/streams/datasift/provider/ErrorHandlerTest.java b/streams-contrib/streams-provider-datasift/src/test/java/org/apache/streams/datasift/provider/ErrorHandlerTest.java
deleted file mode 100644
index 2c31e13..0000000
--- a/streams-contrib/streams-provider-datasift/src/test/java/org/apache/streams/datasift/provider/ErrorHandlerTest.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.streams.datasift.provider;
-
-import org.junit.Ignore;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-/**
- * Requires Java version 1.7!
- */
-public class ErrorHandlerTest {
-
-    @Test @Ignore
-    public void testErrorHandler() {
-        DatasiftStreamProvider mockProvider = Mockito.mock(DatasiftStreamProvider.class);
-        String streamHash = "fakeHash1";
-        ErrorHandler handler = new ErrorHandler(mockProvider, streamHash);
-        handler.exceptionCaught(new Exception("TEST EXCEPTION"));
-        Mockito.verify(mockProvider).startStreamForHash(streamHash);
-    }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48b98844/streams-contrib/streams-provider-datasift/src/test/java/org/apache/streams/datasift/provider/SubscriptionTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java/org/apache/streams/datasift/provider/SubscriptionTest.java b/streams-contrib/streams-provider-datasift/src/test/java/org/apache/streams/datasift/provider/SubscriptionTest.java
deleted file mode 100644
index 4ca53da..0000000
--- a/streams-contrib/streams-provider-datasift/src/test/java/org/apache/streams/datasift/provider/SubscriptionTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package org.apache.streams.datasift.provider;
-
-import com.datasift.client.core.Stream;
-import com.datasift.client.stream.Interaction;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Requires Java version 1.7!
- */
-public class SubscriptionTest {
-
-    @Test @Ignore
-    public void testSubscriptionOnMessage() {
-        Stream mockStream = Mockito.mock(Stream.class);
-        Mockito.when(mockStream.hash()).thenReturn("1");
-        Queue<Interaction> interactionQueue = new ConcurrentLinkedQueue<Interaction>();
-        Subscription subscriptionStub = new Subscription(mockStream, interactionQueue);
-        addInteractions(1, subscriptionStub);
-        assertEquals(1, interactionQueue.size());
-        addInteractions(30, subscriptionStub);
-        assertEquals(31, interactionQueue.size());
-        while(!interactionQueue.isEmpty())
-            interactionQueue.poll();
-        addInteractions(5, subscriptionStub);
-        assertEquals(5, interactionQueue.size());
-    }
-
-    private void addInteractions(int numToAdd, Subscription subscription) {
-        for(int i=0; i < numToAdd; ++i) {
-            subscription.onMessage(Mockito.mock(Interaction.class));
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48b98844/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java
new file mode 100644
index 0000000..382a40b
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java
@@ -0,0 +1,63 @@
+package com.datasift.test;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.streams.datasift.Datasift;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: sblackmon
+ * Date: 8/20/13
+ * Time: 5:57 PM
+ * To change this template use File | Settings | File Templates.
+ */
+public class DatasiftSerDeTest {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftSerDeTest.class);
+
+    private ObjectMapper mapper = new ObjectMapper();
+
+    @Ignore
+    @Test
+    public void Tests()
+    {
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
+        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
+        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
+
+        InputStream is = DatasiftSerDeTest.class.getResourceAsStream("/part-r-00000.json");
+        InputStreamReader isr = new InputStreamReader(is);
+        BufferedReader br = new BufferedReader(isr);
+
+        try {
+            while (br.ready()) {
+                String line = br.readLine();
+                LOGGER.debug(line);
+
+                Datasift ser = mapper.readValue(line, Datasift.class);
+
+                String de = mapper.writeValueAsString(ser);
+
+                LOGGER.debug(de);
+
+                Datasift serde = mapper.readValue(de, Datasift.class);
+
+                Assert.assertEquals(ser, serde);
+
+                LOGGER.debug(mapper.writeValueAsString(serde));
+            }
+        } catch( Exception e ) {
+            e.printStackTrace();
+            Assert.fail();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48b98844/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftStreamProviderTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftStreamProviderTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftStreamProviderTest.java
new file mode 100644
index 0000000..2a25dc6
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftStreamProviderTest.java
@@ -0,0 +1,126 @@
+package org.apache.streams.datasift.provider;
+
+import com.datasift.client.DataSiftClient;
+import com.datasift.client.stream.StreamEventListener;
+import com.datasift.client.stream.StreamingData;
+import com.google.common.collect.Lists;
+import org.apache.streams.datasift.DatasiftConfiguration;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.times;
+
+/**
+ * Requires Java version 1.7!
+ */
+public class DatasiftStreamProviderTest {
+
+    private static final String HASH1 = "fake1";
+    private static final String HASH2 = "fake2";
+    private static final String HASH3 = "fake3";
+
+    @Test
+    public void startStreamForHash() {
+        DatasiftStreamProvider.DeleteHandler handler = new DatasiftStreamProvider.DeleteHandler();
+        final List<DataSiftClient> clientList = Lists.newLinkedList();
+        DatasiftStreamProvider provider = createStubbedProvider(clientList, handler);
+        provider.prepare(null);
+        provider.startStreamForHash(HASH1);
+        assertEquals(1, clientList.size());
+        provider.startStreamForHash(HASH2);
+        assertEquals(2, clientList.size());
+        provider.startStreamForHash(HASH3);
+    }
+
+    @Test
+    public void testStartStream() {
+        DatasiftStreamProvider.DeleteHandler handler = new DatasiftStreamProvider.DeleteHandler();
+        final List<DataSiftClient> clientList = Lists.newLinkedList();
+        DatasiftStreamProvider provider = createStubbedProvider(clientList, handler);
+        provider.prepare(null);
+        provider.startStream();
+        assertEquals(3, clientList.size());
+    }
+
+    @Test
+    public void testShutDownStream() {
+        DatasiftStreamProvider.DeleteHandler handler = new DatasiftStreamProvider.DeleteHandler();
+        final List<DataSiftClient> clientList = Lists.newLinkedList();
+        DatasiftStreamProvider provider = createStubbedProvider(clientList, handler);
+        provider.prepare(null);
+        provider.startStream();
+        assertEquals(3, clientList.size());
+        int shutDownCount = 0;
+        DataSiftClient client = clientList.get(0);
+        provider.shutDownStream(HASH1);
+        Mockito.verify(client, times(1)).shutdown();
+        client = clientList.get(1);
+        Mockito.verify(client, times(0)).shutdown();
+        client = clientList.get(2);
+        Mockito.verify(client, times(0)).shutdown();
+
+        provider.shutDownStream(HASH3);
+        Mockito.verify(client, times(1)).shutdown();
+        client = clientList.get(1);
+        Mockito.verify(client, times(0)).shutdown();
+        client = clientList.get(2);
+        Mockito.verify(client, times(1)).shutdown();
+    }
+
+    @Test
+    public void testStartAlreadyInprogressStream() {
+        DatasiftStreamProvider.DeleteHandler handler = new DatasiftStreamProvider.DeleteHandler();
+        final List<DataSiftClient> clientList = Lists.newLinkedList();
+        DatasiftStreamProvider provider = createStubbedProvider(clientList, handler);
+        provider.prepare(null);
+        provider.startStream();
+        assertEquals(3, clientList.size());
+        int shutDownCount = 0;
+        DataSiftClient client = clientList.get(0);
+        provider.startStreamForHash(HASH1);
+        assertEquals(4, clientList.size());
+        Mockito.verify(client, times(1)).shutdown();
+        client = clientList.get(1);
+        Mockito.verify(client, times(0)).shutdown();
+        client = clientList.get(2);
+        Mockito.verify(client, times(0)).shutdown();
+        client = clientList.get(3);
+        Mockito.verify(client, times(0)).shutdown();
+    }
+
+
+
+
+    private DatasiftStreamProvider createStubbedProvider(final List<DataSiftClient> clientList, StreamEventListener listener) {
+        DatasiftStreamProvider provider = new DatasiftStreamProvider(listener, getTestConfiguration()) {
+            @Override
+            protected DataSiftClient getNewClient(String userName, String apiKey) {
+                DataSiftClient client = Mockito.mock(DataSiftClient.class);
+                StreamingData mockData = Mockito.mock(StreamingData.class);
+                Mockito.when(client.liveStream()).thenReturn(mockData);
+                clientList.add(client);
+                return client;
+            }
+        };
+        return provider;
+    }
+
+    private DatasiftConfiguration getTestConfiguration() {
+        DatasiftConfiguration config = new DatasiftConfiguration();
+        config.setUserName("fakeName");
+        config.setApiKey("fakeApiKey");
+        List<String> streamHashes = Lists.newLinkedList();
+        streamHashes.add(HASH1);
+        streamHashes.add(HASH2);
+        streamHashes.add(HASH3);
+        config.setStreamHash(streamHashes);
+        return config;
+    }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48b98844/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/ErrorHandlerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/ErrorHandlerTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/ErrorHandlerTest.java
new file mode 100644
index 0000000..d3c3322
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/ErrorHandlerTest.java
@@ -0,0 +1,22 @@
+package org.apache.streams.datasift.provider;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Requires Java version 1.7!
+ */
+public class ErrorHandlerTest {
+
+    @Test
+    public void testErrorHandler() {
+        DatasiftStreamProvider mockProvider = Mockito.mock(DatasiftStreamProvider.class);
+        String streamHash = "fakeHash1";
+        ErrorHandler handler = new ErrorHandler(mockProvider, streamHash);
+        handler.exceptionCaught(new Exception("TEST EXCEPTION"));
+        Mockito.verify(mockProvider).startStreamForHash(streamHash);
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48b98844/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/SubscriptionTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/SubscriptionTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/SubscriptionTest.java
new file mode 100644
index 0000000..54a815b
--- /dev/null
+++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/SubscriptionTest.java
@@ -0,0 +1,40 @@
+package org.apache.streams.datasift.provider;
+
+import com.datasift.client.core.Stream;
+import com.datasift.client.stream.Interaction;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Requires Java version 1.7!
+ */
+public class SubscriptionTest {
+
+    @Test
+    public void testSubscriptionOnMessage() {
+        Stream mockStream = Mockito.mock(Stream.class);
+        Mockito.when(mockStream.hash()).thenReturn("1");
+        Queue<Interaction> interactionQueue = new ConcurrentLinkedQueue<Interaction>();
+        Subscription subscriptionStub = new Subscription(mockStream, interactionQueue);
+        addInteractions(1, subscriptionStub);
+        assertEquals(1, interactionQueue.size());
+        addInteractions(30, subscriptionStub);
+        assertEquals(31, interactionQueue.size());
+        while(!interactionQueue.isEmpty())
+            interactionQueue.poll();
+        addInteractions(5, subscriptionStub);
+        assertEquals(5, interactionQueue.size());
+    }
+
+    private void addInteractions(int numToAdd, Subscription subscription) {
+        for(int i=0; i < numToAdd; ++i) {
+            subscription.onMessage(Mockito.mock(Interaction.class));
+        }
+    }
+
+}