You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/05 20:54:00 UTC

[13/52] [abbrv] git commit: misc improvements

misc improvements


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ee4efbbb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ee4efbbb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ee4efbbb

Branch: refs/heads/sblackmon
Commit: ee4efbbbdc850f84b20cabf56355b57685ec933b
Parents: eb6f46a
Author: sblackmon <sb...@w2odigital.com>
Authored: Tue Apr 1 00:02:07 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Tue Apr 1 00:02:07 2014 -0500

----------------------------------------------------------------------
 .../gmail/provider/GMailImapProviderTask.java   |  3 +-
 .../google/gmail/provider/GMailProvider.java    | 39 ++++++++++++++----
 .../gmail/provider/GMailRssProviderTask.java    |  3 +-
 .../twitter/provider/TwitterStreamProvider.java | 43 +++++++++++++-------
 .../TwitterJsonTweetActivitySerializer.java     |  2 +-
 streams-runtimes/streams-runtime-local/pom.xml  |  6 +++
 .../local/builders/LocalStreamBuilderTest.java  | 14 ++++---
 7 files changed, 80 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ee4efbbb/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java
index 068c214..0007a9c 100644
--- a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java
+++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailImapProviderTask.java
@@ -4,6 +4,7 @@ import com.googlecode.gmail4j.GmailClient;
 import com.googlecode.gmail4j.GmailMessage;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.util.ComponentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,7 +49,7 @@ public class GMailImapProviderTask implements Runnable {
             GMailMessageActivitySerializer serializer = new GMailMessageActivitySerializer( this.provider );
             activity = serializer.deserialize(message);
             StreamsDatum entry = new StreamsDatum(activity);
-            this.provider.providerQueue.offer(entry);
+            ComponentUtils.offerUntilSuccess(entry, this.provider.providerQueue);
 
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ee4efbbb/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
index 7ec157e..abd7e47 100644
--- a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
+++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
@@ -1,6 +1,7 @@
 package com.google.gmail.provider;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.gmail.GMailConfiguration;
@@ -13,9 +14,7 @@ import com.googlecode.gmail4j.javamail.ImapGmailConnection;
 import com.googlecode.gmail4j.rss.RssGmailClient;
 import com.typesafe.config.Config;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.*;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,7 +26,7 @@ import java.util.concurrent.*;
 /**
  * Created by sblackmon on 12/10/13.
  */
-public class GMailProvider implements StreamsProvider {
+public class GMailProvider implements StreamsProvider, DatumStatusCountable {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(GMailProvider.class);
 
@@ -54,7 +53,7 @@ public class GMailProvider implements StreamsProvider {
     protected GmailClient rssClient;
     protected ImapGmailClient imapClient;
 
-    protected ListeningExecutorService executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+    private ExecutorService executor;
 
     private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
         return new ThreadPoolExecutor(nThreads, nThreads,
@@ -82,14 +81,31 @@ public class GMailProvider implements StreamsProvider {
         this.klass = klass;
     }
 
+    protected DatumStatusCounter countersTotal = new DatumStatusCounter();
+    protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
+
     @Override
     public void startStream() {
-        new Thread(new GMailImapProviderTask(this)).start();
+
+        executor.submit(new GMailImapProviderTask(this));
+
     }
 
     @Override
     public StreamsResultSet readCurrent() {
-        return null;
+
+        StreamsResultSet current;
+
+        synchronized( GMailProvider.class ) {
+            current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+            current.setCounter(new DatumStatusCounter());
+            current.getCounter().add(countersCurrent);
+            countersTotal.add(countersCurrent);
+            countersCurrent = new DatumStatusCounter();
+            providerQueue.clear();
+        }
+
+        return current;
     }
 
     @Override
@@ -118,6 +134,10 @@ public class GMailProvider implements StreamsProvider {
         GmailConnection imapConnection = new ImapGmailConnection();
         imapConnection.setLoginCredentials(config.getUserName(), config.getPassword().toCharArray());
         imapClient.setConnection(imapConnection);
+
+        executor = Executors.newSingleThreadExecutor();
+
+        startStream();
     }
 
     @Override
@@ -128,4 +148,9 @@ public class GMailProvider implements StreamsProvider {
             e.printStackTrace();
         }
     }
+
+    @Override
+    public DatumStatusCounter getDatumStatusCounter() {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ee4efbbb/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java
index 73b6d77..d045015 100644
--- a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java
+++ b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java
@@ -2,6 +2,7 @@ package com.google.gmail.provider;
 
 import com.googlecode.gmail4j.GmailMessage;
 import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.util.ComponentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,7 +29,7 @@ public class GMailRssProviderTask implements Runnable {
 
             StreamsDatum entry = new StreamsDatum(message);
 
-            this.provider.providerQueue.offer(entry);
+            ComponentUtils.offerUntilSuccess(entry, this.provider.providerQueue);
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ee4efbbb/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index e9ce10e..6a3def6 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -3,6 +3,9 @@ package org.apache.streams.twitter.provider;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.twitter.hbc.ClientBuilder;
@@ -28,6 +31,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.math.BigInteger;
+import java.util.Collection;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.*;
@@ -102,9 +106,15 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
     }
 
     @Override
-    public StreamsResultSet readCurrent() {
-        StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
-        return result;
+    public synchronized StreamsResultSet readCurrent() {
+        Collection<StreamsDatum> currentIterator = Lists.newArrayList();
+        Iterators.addAll(currentIterator, providerQueue.iterator());
+
+        StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator));
+
+        providerQueue.clear();
+
+        return current;
     }
 
     @Override
@@ -124,11 +134,6 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
 
         Preconditions.checkNotNull(this.klass);
 
-        Preconditions.checkNotNull(config.getOauth().getConsumerKey());
-        Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
-        Preconditions.checkNotNull(config.getOauth().getAccessToken());
-        Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
-
         Preconditions.checkNotNull(config.getEndpoint());
         if(config.getEndpoint().endsWith("sample.json") ) {
             endpoint = new StatusesSampleEndpoint();
@@ -145,16 +150,26 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable {
             return;
 
         Authentication auth;
-        if( config.getOauth() != null ) {
-            auth = new OAuth1(config.getOauth().getConsumerKey(),
-                    config.getOauth().getConsumerSecret(),
-                    config.getOauth().getAccessToken(),
-                    config.getOauth().getAccessTokenSecret());
-        } else if( config.getBasicauth() != null ) {
+        if( config.getBasicauth() != null ) {
+
+            Preconditions.checkNotNull(config.getBasicauth().getUsername());
+            Preconditions.checkNotNull(config.getBasicauth().getPassword());
+
             auth = new BasicAuth(
                     config.getBasicauth().getUsername(),
                     config.getBasicauth().getPassword()
             );
+        } else if( config.getOauth() != null ) {
+
+            Preconditions.checkNotNull(config.getOauth().getConsumerKey());
+            Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
+            Preconditions.checkNotNull(config.getOauth().getAccessToken());
+            Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
+
+            auth = new OAuth1(config.getOauth().getConsumerKey(),
+                    config.getOauth().getConsumerSecret(),
+                    config.getOauth().getAccessToken(),
+                    config.getOauth().getAccessTokenSecret());
         } else {
             return;
         }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ee4efbbb/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
index d258dac..b141482 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
@@ -51,7 +51,7 @@ public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<St
     @Override
     public Activity deserialize(String serialized) throws ActivitySerializerException {
 
-        ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+        ObjectMapper mapper = StreamsTwitterMapper.getInstance();
         Tweet tweet = null;
         try {
             tweet = mapper.readValue(serialized, Tweet.class);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ee4efbbb/streams-runtimes/streams-runtime-local/pom.xml
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/pom.xml b/streams-runtimes/streams-runtime-local/pom.xml
index 50b8524..ee76b6b 100644
--- a/streams-runtimes/streams-runtime-local/pom.xml
+++ b/streams-runtimes/streams-runtime-local/pom.xml
@@ -71,6 +71,12 @@
             <groupId>org.slf4j</groupId>
             <artifactId>log4j-over-slf4j</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
 
     </dependencies>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ee4efbbb/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
index f627e15..0bdaf61 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
@@ -14,6 +14,7 @@ import java.util.HashSet;
 import java.util.Scanner;
 
 import static org.junit.Assert.*;
+import static org.hamcrest.Matchers.*;
 
 /**
  * Basic Tests for the LocalStreamBuilder.
@@ -71,7 +72,7 @@ public class LocalStreamBuilderTest {
             ++count;
             scanner.nextLine();
         }
-        assertEquals(numDatums+1, count);
+        assertThat(count, greaterThan(numDatums)); // using > because number of lines in system.out is non-deterministic
     }
 
     @Test
@@ -90,7 +91,7 @@ public class LocalStreamBuilderTest {
             ++count;
             scanner.nextLine();
         }
-        assertEquals(numDatums+1, count);
+        assertThat(count, greaterThan(numDatums)); // using > because number of lines in system.out is non-deterministic
     }
 
     @Test
@@ -112,7 +113,7 @@ public class LocalStreamBuilderTest {
             ++count;
             scanner.nextLine();
         }
-        assertEquals(numDatums+1, count); //+1 is to make sure cleanup is called on the writer
+        assertThat(count, greaterThan(numDatums)); // using > because number of lines in system.out is non-deterministic
         assertEquals(parallelHint, PassthroughDatumCounterProcessor.claimedNumber.size()); //test 40 were initialized
         assertTrue(PassthroughDatumCounterProcessor.sawData.size() > 1 && PassthroughDatumCounterProcessor.sawData.size() <= parallelHint); //test more than one processor got data
     }
@@ -133,11 +134,11 @@ public class LocalStreamBuilderTest {
         builder.start();
         int count = 0;
         Scanner scanner = new Scanner(new ByteArrayInputStream(out.toByteArray()));
-        while(scanner.hasNextLine()) {
+        while (scanner.hasNextLine()) {
             ++count;
             scanner.nextLine();
         }
-        assertEquals(numDatums1+numDatums2+1, count);
+        assertThat(count, greaterThan(numDatums1 + numDatums2)); // using > because number of lines in system.out is non-deterministic
     }
 
     @Test
@@ -155,7 +156,8 @@ public class LocalStreamBuilderTest {
             ++count;
             scanner.nextLine();
         }
-        assertEquals((numDatums*2)+1, count);
+        assertThat(count, greaterThan(numDatums*2)); // using > because number of lines in system.out is non-deterministic
+
     }