You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@streams.apache.org by steveblackmon <gi...@git.apache.org> on 2014/07/09 01:40:22 UTC

[GitHub] incubator-streams pull request: Streams 115

GitHub user steveblackmon opened a pull request:

    https://github.com/apache/incubator-streams/pull/51

    Streams 115

    this implementation can retrieve an arbitrary list of user ids or screen_names with a single token

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/apache/incubator-streams STREAMS-115

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-streams/pull/51.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #51
    
----
commit 476748d4518ecfc960550e43bd3d25e312c731a4
Author: Steve Blackmon <sb...@w2odigital.com>
Date:   2014-06-24T04:53:23Z

    allow timeline provider to run as perpetual stream

commit 8b12840903eeb9563381d2e0962d83e118034181
Author: Steve Blackmon <sb...@w2odigital.com>
Date:   2014-06-26T23:49:57Z

    sleep for nonzero mills

commit b7a65e1eaa9f523e3675361d9acde63906afac10
Author: Steve Blackmon <sb...@w2odigital.com>
Date:   2014-07-08T23:37:58Z

    better comments and more optimal retry logic

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by mfranklin <gi...@git.apache.org>.
Github user mfranklin commented on the pull request:

    https://github.com/apache/incubator-streams/pull/51#issuecomment-50202568
  
    :+1: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by steveblackmon <gi...@git.apache.org>.
Github user steveblackmon commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/51#discussion_r15418422
  
    --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java ---
    @@ -58,47 +56,43 @@ public void run() {
                 int keepTrying = 0;
     
                 // keep trying to load, give it 5 attempts.
    -            //while (keepTrying < 10)
    -            while (keepTrying < 1)
    +            //This value was chosen because it seemed like a reasonable number of times
    +            //to retry capturing a timeline given the sorts of errors that could potentially
    +            //occur (network timeout/interruption, faulty client, etc.)
    +            while (keepTrying < 5)
                 {
     
                     try
                     {
    -                    statuses = twitter.getUserTimeline(id, paging);
    +                    statuses = client.getUserTimeline(id, paging);
     
                         for (Status tStat : statuses)
                         {
    -                        if( provider.start != null &&
    -                            provider.start.isAfter(new DateTime(tStat.getCreatedAt())))
    -                        {
    -                            // they hit the last date we wanted to collect
    -                            // we can now exit early
    -                            KeepGoing = false;
    -                        }
    -                        // emit the record
    -                        String json = DataObjectFactory.getRawJSON(tStat);
    -
    -                        //provider.offer(json);
    +                        String json = TwitterObjectFactory.getRawJSON(tStat);
     
    +                        try {
    +                            provider.lock.readLock().lock();
    --- End diff --
    
    Not sure why this is better, but simple, refactoring.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by robdouglas <gi...@git.apache.org>.
Github user robdouglas commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/51#discussion_r15954062
  
    --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---
    @@ -272,12 +210,11 @@ void shutdownAndAwaitTermination(ExecutorService pool) {
             }
         }
     
    -
         @Override
         public void prepare(Object o) {
    --- End diff --
    
    I'm not sure I agree that using the numericIdsOnly and screenNamesOnly are the best call in the prepare method. Is the goal here to ensure that our "id" list contains nothing but numeric IDs, given the possibility that the list of identifiers that we start with can have both screenNames as well as numeric IDs?
    
    If that's the case then I would suggest getting rid of the numericIdsOnly and screenNamesOnly methods and replace them with something like this:
    
    ''''
    @Override
        public void prepare(Object o) {
    
            executor = getExecutor();
    
            try {
                lock.writeLock().lock();
                providerQueue = constructQueue();
            } finally {
                lock.writeLock().unlock();
            }
    
            Preconditions.checkNotNull(providerQueue);
            Preconditions.checkNotNull(this.klass);
            Preconditions.checkNotNull(config.getOauth().getConsumerKey());
            Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
            Preconditions.checkNotNull(config.getOauth().getAccessToken());
            Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
            Preconditions.checkNotNull(config.getInfo());
    
            consolidateToIDs();
        }
    
        /**
         * Using the "info" list that is contained in the configuration, ensure that all
         * account identifiers are converted to IDs (Longs) instead of screenNames (Strings)
         */
        private void consolidateToIDs() {
            List<String> screenNames = Lists.newArrayList();
            ids = Lists.newArrayList();
    
            for(Object account : config.getInfo()) {
                if(account instanceof String) {
                    screenNames.add((String)account);
                } else if (account instanceof Long) {
                    ids.add(Long.parseLong(Objects.toString(account, null)));
                }
            }
    
            // Twitter allows for batches up to 100 per request, but you cannot mix types
            screenNameBatches = new ArrayList<String[]>();
            while(screenNames.size() >= 100) {
                screenNameBatches.add(screenNames.subList(0, 100).toArray(new String[0]));
                screenNames = screenNames.subList(100, screenNames.size());
            }
    
            if(screenNames.size() > 0)
                screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
    
            Iterator<String[]> screenNameBatchIterator = screenNameBatches.iterator();
    
            while(screenNameBatchIterator.hasNext()) {
                Collection<Long> batchIds = retrieveIds(screenNameBatchIterator.next());
                ids.addAll(batchIds);
            }
        }
    ''''


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by mfranklin <gi...@git.apache.org>.
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/51#discussion_r14898584
  
    --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java ---
    @@ -42,7 +42,7 @@ public static int handleTwitterError(Twitter twitter, Exception exception)
                 {
                     LOGGER.warn("Rate Limit Exceeded");
                     try {
    -                    Thread.sleep(backoff *= 2);
    +                    Thread.sleep(retry);
                     } catch (InterruptedException e1) {}
    --- End diff --
    
    +1.  Let's fix it here since it is already being edited.  At worst, it should do Thread.currentThread().interrupt()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by rbnks <gi...@git.apache.org>.
Github user rbnks commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/51#discussion_r14898741
  
    --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---
    @@ -291,15 +262,55 @@ public void prepare(Object o) {
             Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
             Preconditions.checkNotNull(config.getOauth().getAccessToken());
             Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
    -        Preconditions.checkNotNull(config.getFollow());
    +        Preconditions.checkNotNull(config.getInfo());
    +
    +        List<String> screenNames = new ArrayList<String>();
    +        List<String[]> screenNameBatches = new ArrayList<String[]>();
    +
    +        List<Long> ids = new ArrayList<Long>();
    +        List<Long[]> idsBatches = new ArrayList<Long[]>();
    +
    +        for(String s : config.getInfo()) {
    +            if(s != null)
    +            {
    +                String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
    +
    +                // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
    +                // screen name list
    +                try {
    +                    ids.add(Long.parseLong(potentialScreenName));
    +                } catch (Exception e) {
    --- End diff --
    
    Exception should be NumberFormatException


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by steveblackmon <gi...@git.apache.org>.
Github user steveblackmon commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/51#discussion_r15417781
  
    --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---
    @@ -291,15 +262,55 @@ public void prepare(Object o) {
             Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
             Preconditions.checkNotNull(config.getOauth().getAccessToken());
             Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
    -        Preconditions.checkNotNull(config.getFollow());
    +        Preconditions.checkNotNull(config.getInfo());
    +
    +        List<String> screenNames = new ArrayList<String>();
    +        List<String[]> screenNameBatches = new ArrayList<String[]>();
    +
    +        List<Long> ids = new ArrayList<Long>();
    +        List<Long[]> idsBatches = new ArrayList<Long[]>();
    +
    +        for(String s : config.getInfo()) {
    --- End diff --
    
    Legacy code. Agreed, refactoring.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by mfranklin <gi...@git.apache.org>.
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/51#discussion_r15169742
  
    --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---
    @@ -125,101 +104,88 @@ public TwitterTimelineProvider(TwitterStreamConfiguration config, Class klass) {
         @Override
         public void startStream() {
             LOGGER.debug("{} startStream", STREAMS_ID);
    -        throw new org.apache.commons.lang.NotImplementedException();
    -    }
     
    -    protected void captureTimeline(long currentId) {
    +        Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
     
    -        Paging paging = new Paging(1, 200);
    -        List<Status> statuses = null;
    -        boolean KeepGoing = true;
    -        boolean hadFailure = false;
    +        LOGGER.info("readCurrent");
     
    -        do
    -        {
    -            int keepTrying = 0;
    +        while(idsBatches.hasNext())
    +            loadBatch(idsBatches.next());
     
    -            // keep trying to load, give it 5 attempts.
    -            //This value was chosen because it seemed like a reasonable number of times
    -            //to retry capturing a timeline given the sorts of errors that could potentially
    -            //occur (network timeout/interruption, faulty client, etc.)
    -            while (keepTrying < 5)
    -            {
    +        while(screenNameBatches.hasNext())
    +            loadBatch(screenNameBatches.next());
     
    -                try
    -                {
    -                    statuses = client.getUserTimeline(currentId, paging);
    -                    for (Status tStat : statuses) {
    -                        String json = TwitterObjectFactory.getRawJSON(tStat);
    -                        try {
    -                            lock.readLock().lock();
    -                            ComponentUtils.offerUntilSuccess(new StreamsDatum(json), providerQueue);
    -                        } finally {
    -                            lock.readLock().unlock();
    -                        }
    -                    }
    -
    -                    paging.setPage(paging.getPage() + 1);
    -
    -                    keepTrying = 10;
    -                }
    -                catch(TwitterException twitterException) {
    -                    keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
    -                }
    -                catch(Exception e) {
    -                    keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
    -                }
    -            }
    -        }
    -        while (shouldContinuePulling(statuses));
    +        executor.shutdown();
         }
     
    -    private Map<Long, Long> userPullInfo;
    -
         protected boolean shouldContinuePulling(List<Status> statuses) {
             return (statuses != null) && (statuses.size() > 0);
         }
     
    -    private void sleep()
    -    {
    -        Thread.yield();
    -        try {
    -            // wait one tenth of a millisecond
    -            Thread.yield();
    -            Thread.sleep(1);
    -            Thread.yield();
    -        }
    -        catch(IllegalArgumentException e) {
    -            // passing in static values, this will never happen
    -        }
    -        catch(InterruptedException e) {
    -            // noOp, there must have been an issue sleeping
    +    private void loadBatch(Long[] ids) {
    +        Twitter client = getTwitterClient();
    +        int keepTrying = 0;
    +
    +        // keep trying to load, give it 5 attempts.
    +        //while (keepTrying < 10)
    +        while (keepTrying < 1)
    --- End diff --
    
    What is the function of this while loop? You are submitting to an executor service.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by mfranklin <gi...@git.apache.org>.
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/51#discussion_r15170133
  
    --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---
    @@ -291,15 +262,55 @@ public void prepare(Object o) {
             Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
             Preconditions.checkNotNull(config.getOauth().getAccessToken());
             Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
    -        Preconditions.checkNotNull(config.getFollow());
    +        Preconditions.checkNotNull(config.getInfo());
    +
    +        List<String> screenNames = new ArrayList<String>();
    +        List<String[]> screenNameBatches = new ArrayList<String[]>();
    +
    +        List<Long> ids = new ArrayList<Long>();
    +        List<Long[]> idsBatches = new ArrayList<Long[]>();
    +
    +        for(String s : config.getInfo()) {
    --- End diff --
    
    This should be a separate method, potentially even a utility.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by steveblackmon <gi...@git.apache.org>.
Github user steveblackmon commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/51#discussion_r15414307
  
    --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---
    @@ -125,101 +104,88 @@ public TwitterTimelineProvider(TwitterStreamConfiguration config, Class klass) {
         @Override
         public void startStream() {
             LOGGER.debug("{} startStream", STREAMS_ID);
    -        throw new org.apache.commons.lang.NotImplementedException();
    -    }
     
    -    protected void captureTimeline(long currentId) {
    +        Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
     
    -        Paging paging = new Paging(1, 200);
    -        List<Status> statuses = null;
    -        boolean KeepGoing = true;
    -        boolean hadFailure = false;
    +        LOGGER.info("readCurrent");
     
    -        do
    -        {
    -            int keepTrying = 0;
    +        while(idsBatches.hasNext())
    +            loadBatch(idsBatches.next());
     
    -            // keep trying to load, give it 5 attempts.
    -            //This value was chosen because it seemed like a reasonable number of times
    -            //to retry capturing a timeline given the sorts of errors that could potentially
    -            //occur (network timeout/interruption, faulty client, etc.)
    -            while (keepTrying < 5)
    -            {
    +        while(screenNameBatches.hasNext())
    +            loadBatch(screenNameBatches.next());
     
    -                try
    -                {
    -                    statuses = client.getUserTimeline(currentId, paging);
    -                    for (Status tStat : statuses) {
    -                        String json = TwitterObjectFactory.getRawJSON(tStat);
    -                        try {
    -                            lock.readLock().lock();
    -                            ComponentUtils.offerUntilSuccess(new StreamsDatum(json), providerQueue);
    -                        } finally {
    -                            lock.readLock().unlock();
    -                        }
    -                    }
    -
    -                    paging.setPage(paging.getPage() + 1);
    -
    -                    keepTrying = 10;
    -                }
    -                catch(TwitterException twitterException) {
    -                    keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
    -                }
    -                catch(Exception e) {
    -                    keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
    -                }
    -            }
    -        }
    -        while (shouldContinuePulling(statuses));
    +        executor.shutdown();
         }
     
    -    private Map<Long, Long> userPullInfo;
    -
         protected boolean shouldContinuePulling(List<Status> statuses) {
             return (statuses != null) && (statuses.size() > 0);
         }
     
    -    private void sleep()
    -    {
    -        Thread.yield();
    -        try {
    -            // wait one tenth of a millisecond
    -            Thread.yield();
    -            Thread.sleep(1);
    -            Thread.yield();
    -        }
    -        catch(IllegalArgumentException e) {
    -            // passing in static values, this will never happen
    -        }
    -        catch(InterruptedException e) {
    -            // noOp, there must have been an issue sleeping
    +    private void loadBatch(Long[] ids) {
    +        Twitter client = getTwitterClient();
    +        int keepTrying = 0;
    +
    +        // keep trying to load, give it 5 attempts.
    +        //while (keepTrying < 10)
    +        while (keepTrying < 1)
    --- End diff --
    
    Legacy code.  Agreed, removing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by mfranklin <gi...@git.apache.org>.
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/51#discussion_r15169900
  
    --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---
    @@ -125,101 +104,88 @@ public TwitterTimelineProvider(TwitterStreamConfiguration config, Class klass) {
         @Override
         public void startStream() {
             LOGGER.debug("{} startStream", STREAMS_ID);
    -        throw new org.apache.commons.lang.NotImplementedException();
    -    }
     
    -    protected void captureTimeline(long currentId) {
    +        Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
     
    -        Paging paging = new Paging(1, 200);
    -        List<Status> statuses = null;
    -        boolean KeepGoing = true;
    -        boolean hadFailure = false;
    +        LOGGER.info("readCurrent");
     
    -        do
    -        {
    -            int keepTrying = 0;
    +        while(idsBatches.hasNext())
    +            loadBatch(idsBatches.next());
     
    -            // keep trying to load, give it 5 attempts.
    -            //This value was chosen because it seemed like a reasonable number of times
    -            //to retry capturing a timeline given the sorts of errors that could potentially
    -            //occur (network timeout/interruption, faulty client, etc.)
    -            while (keepTrying < 5)
    -            {
    +        while(screenNameBatches.hasNext())
    +            loadBatch(screenNameBatches.next());
     
    -                try
    -                {
    -                    statuses = client.getUserTimeline(currentId, paging);
    -                    for (Status tStat : statuses) {
    -                        String json = TwitterObjectFactory.getRawJSON(tStat);
    -                        try {
    -                            lock.readLock().lock();
    -                            ComponentUtils.offerUntilSuccess(new StreamsDatum(json), providerQueue);
    -                        } finally {
    -                            lock.readLock().unlock();
    -                        }
    -                    }
    -
    -                    paging.setPage(paging.getPage() + 1);
    -
    -                    keepTrying = 10;
    -                }
    -                catch(TwitterException twitterException) {
    -                    keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
    -                }
    -                catch(Exception e) {
    -                    keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
    -                }
    -            }
    -        }
    -        while (shouldContinuePulling(statuses));
    +        executor.shutdown();
         }
     
    -    private Map<Long, Long> userPullInfo;
    -
         protected boolean shouldContinuePulling(List<Status> statuses) {
             return (statuses != null) && (statuses.size() > 0);
         }
     
    -    private void sleep()
    -    {
    -        Thread.yield();
    -        try {
    -            // wait one tenth of a millisecond
    -            Thread.yield();
    -            Thread.sleep(1);
    -            Thread.yield();
    -        }
    -        catch(IllegalArgumentException e) {
    -            // passing in static values, this will never happen
    -        }
    -        catch(InterruptedException e) {
    -            // noOp, there must have been an issue sleeping
    +    private void loadBatch(Long[] ids) {
    +        Twitter client = getTwitterClient();
    +        int keepTrying = 0;
    +
    +        // keep trying to load, give it 5 attempts.
    +        //while (keepTrying < 10)
    +        while (keepTrying < 1)
    +        {
    +            try
    +            {
    +                long[] toQuery = new long[ids.length];
    +                for(int i = 0; i < ids.length; i++)
    +                    toQuery[i] = ids[i];
    +
    +                for (User tStat : client.lookupUsers(toQuery)) {
    +
    +                    TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, tStat.getId());
    +                    executor.submit(providerTask);
    +
    +                }
    +                keepTrying = 10;
    +            }
    +            catch(TwitterException twitterException) {
    +                keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
    +            }
    +            catch(Exception e) {
    +                keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
    +            }
             }
    -        Thread.yield();
         }
     
    -    public StreamsResultSet readCurrent() {
    -        LOGGER.debug("{} readCurrent", STREAMS_ID);
    -
    -        Preconditions.checkArgument(ids.hasNext());
    -        StreamsResultSet result;
    +    private void loadBatch(String[] ids) {
    --- End diff --
    
    This appears to be the exact same logic, but with a different type parameter.  Wouldn't it make more sense to have one method convert the argument to the target type and then use the same logic?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by steveblackmon <gi...@git.apache.org>.
Github user steveblackmon commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/51#discussion_r15414825
  
    --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---
    @@ -272,11 +244,10 @@ void shutdownAndAwaitTermination(ExecutorService pool) {
             }
         }
     
    -
         @Override
         public void prepare(Object o) {
     
    -        executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
    +        executor = getExecutor();
             running.set(true);
    --- End diff --
    
    Legacy code. Agreed, improving.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by mfranklin <gi...@git.apache.org>.
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/51#discussion_r15170463
  
    --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java ---
    @@ -58,47 +56,43 @@ public void run() {
                 int keepTrying = 0;
     
                 // keep trying to load, give it 5 attempts.
    -            //while (keepTrying < 10)
    -            while (keepTrying < 1)
    +            //This value was chosen because it seemed like a reasonable number of times
    +            //to retry capturing a timeline given the sorts of errors that could potentially
    +            //occur (network timeout/interruption, faulty client, etc.)
    +            while (keepTrying < 5)
                 {
     
                     try
                     {
    -                    statuses = twitter.getUserTimeline(id, paging);
    +                    statuses = client.getUserTimeline(id, paging);
     
                         for (Status tStat : statuses)
                         {
    -                        if( provider.start != null &&
    -                            provider.start.isAfter(new DateTime(tStat.getCreatedAt())))
    -                        {
    -                            // they hit the last date we wanted to collect
    -                            // we can now exit early
    -                            KeepGoing = false;
    -                        }
    -                        // emit the record
    -                        String json = DataObjectFactory.getRawJSON(tStat);
    -
    -                        //provider.offer(json);
    +                        String json = TwitterObjectFactory.getRawJSON(tStat);
     
    +                        try {
    +                            provider.lock.readLock().lock();
    --- End diff --
    
    broken encapsulation.  Rather than accessing the fields of the provider class here, you could create a method in the provider that contains the logic ass follows
    
    ````
    protected void addDatum(StreamsDatum datum) {
        try {
             lock.readLock().lock();
             ComponentUtils.offerUntilSuccess(datum, providerQueue);
        } finally {
              lock.readLock().unlock();
        }
    }


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by mfranklin <gi...@git.apache.org>.
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/51#discussion_r15169807
  
    --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---
    @@ -125,101 +104,88 @@ public TwitterTimelineProvider(TwitterStreamConfiguration config, Class klass) {
         @Override
         public void startStream() {
             LOGGER.debug("{} startStream", STREAMS_ID);
    -        throw new org.apache.commons.lang.NotImplementedException();
    -    }
     
    -    protected void captureTimeline(long currentId) {
    +        Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
     
    -        Paging paging = new Paging(1, 200);
    -        List<Status> statuses = null;
    -        boolean KeepGoing = true;
    -        boolean hadFailure = false;
    +        LOGGER.info("readCurrent");
     
    -        do
    -        {
    -            int keepTrying = 0;
    +        while(idsBatches.hasNext())
    +            loadBatch(idsBatches.next());
     
    -            // keep trying to load, give it 5 attempts.
    -            //This value was chosen because it seemed like a reasonable number of times
    -            //to retry capturing a timeline given the sorts of errors that could potentially
    -            //occur (network timeout/interruption, faulty client, etc.)
    -            while (keepTrying < 5)
    -            {
    +        while(screenNameBatches.hasNext())
    +            loadBatch(screenNameBatches.next());
     
    -                try
    -                {
    -                    statuses = client.getUserTimeline(currentId, paging);
    -                    for (Status tStat : statuses) {
    -                        String json = TwitterObjectFactory.getRawJSON(tStat);
    -                        try {
    -                            lock.readLock().lock();
    -                            ComponentUtils.offerUntilSuccess(new StreamsDatum(json), providerQueue);
    -                        } finally {
    -                            lock.readLock().unlock();
    -                        }
    -                    }
    -
    -                    paging.setPage(paging.getPage() + 1);
    -
    -                    keepTrying = 10;
    -                }
    -                catch(TwitterException twitterException) {
    -                    keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
    -                }
    -                catch(Exception e) {
    -                    keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
    -                }
    -            }
    -        }
    -        while (shouldContinuePulling(statuses));
    +        executor.shutdown();
         }
     
    -    private Map<Long, Long> userPullInfo;
    -
         protected boolean shouldContinuePulling(List<Status> statuses) {
             return (statuses != null) && (statuses.size() > 0);
         }
     
    -    private void sleep()
    -    {
    -        Thread.yield();
    -        try {
    -            // wait one tenth of a millisecond
    -            Thread.yield();
    -            Thread.sleep(1);
    -            Thread.yield();
    -        }
    -        catch(IllegalArgumentException e) {
    -            // passing in static values, this will never happen
    -        }
    -        catch(InterruptedException e) {
    -            // noOp, there must have been an issue sleeping
    +    private void loadBatch(Long[] ids) {
    +        Twitter client = getTwitterClient();
    +        int keepTrying = 0;
    +
    +        // keep trying to load, give it 5 attempts.
    +        //while (keepTrying < 10)
    +        while (keepTrying < 1)
    +        {
    +            try
    --- End diff --
    
    The try/catch/backoff here does not seem necessary as the timeline provider task will be responsible for handling errors within their thread.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by rbnks <gi...@git.apache.org>.
Github user rbnks commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/51#discussion_r14898555
  
    --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java ---
    @@ -51,7 +51,7 @@ else if(e.isCausedByNetworkIssue())
                     LOGGER.info("Twitter Network Issues Detected. Backing off...");
                     LOGGER.info("{} - {}", e.getExceptionCode(), e.getLocalizedMessage());
                     try {
    -                    Thread.sleep(backoff *= 2);
    +                    Thread.sleep(retry);
                     } catch (InterruptedException e1) {}
    --- End diff --
    
    Another swallowed interrupted exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by steveblackmon <gi...@git.apache.org>.
Github user steveblackmon commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/51#discussion_r15414816
  
    --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---
    @@ -125,101 +104,88 @@ public TwitterTimelineProvider(TwitterStreamConfiguration config, Class klass) {
         @Override
         public void startStream() {
             LOGGER.debug("{} startStream", STREAMS_ID);
    -        throw new org.apache.commons.lang.NotImplementedException();
    -    }
     
    -    protected void captureTimeline(long currentId) {
    +        Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
     
    -        Paging paging = new Paging(1, 200);
    -        List<Status> statuses = null;
    -        boolean KeepGoing = true;
    -        boolean hadFailure = false;
    +        LOGGER.info("readCurrent");
     
    -        do
    -        {
    -            int keepTrying = 0;
    +        while(idsBatches.hasNext())
    +            loadBatch(idsBatches.next());
     
    -            // keep trying to load, give it 5 attempts.
    -            //This value was chosen because it seemed like a reasonable number of times
    -            //to retry capturing a timeline given the sorts of errors that could potentially
    -            //occur (network timeout/interruption, faulty client, etc.)
    -            while (keepTrying < 5)
    -            {
    +        while(screenNameBatches.hasNext())
    +            loadBatch(screenNameBatches.next());
     
    -                try
    -                {
    -                    statuses = client.getUserTimeline(currentId, paging);
    -                    for (Status tStat : statuses) {
    -                        String json = TwitterObjectFactory.getRawJSON(tStat);
    -                        try {
    -                            lock.readLock().lock();
    -                            ComponentUtils.offerUntilSuccess(new StreamsDatum(json), providerQueue);
    -                        } finally {
    -                            lock.readLock().unlock();
    -                        }
    -                    }
    -
    -                    paging.setPage(paging.getPage() + 1);
    -
    -                    keepTrying = 10;
    -                }
    -                catch(TwitterException twitterException) {
    -                    keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
    -                }
    -                catch(Exception e) {
    -                    keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
    -                }
    -            }
    -        }
    -        while (shouldContinuePulling(statuses));
    +        executor.shutdown();
         }
     
    -    private Map<Long, Long> userPullInfo;
    -
         protected boolean shouldContinuePulling(List<Status> statuses) {
             return (statuses != null) && (statuses.size() > 0);
         }
     
    -    private void sleep()
    -    {
    -        Thread.yield();
    -        try {
    -            // wait one tenth of a millisecond
    -            Thread.yield();
    -            Thread.sleep(1);
    -            Thread.yield();
    -        }
    -        catch(IllegalArgumentException e) {
    -            // passing in static values, this will never happen
    -        }
    -        catch(InterruptedException e) {
    -            // noOp, there must have been an issue sleeping
    +    private void loadBatch(Long[] ids) {
    +        Twitter client = getTwitterClient();
    +        int keepTrying = 0;
    +
    +        // keep trying to load, give it 5 attempts.
    +        //while (keepTrying < 10)
    +        while (keepTrying < 1)
    +        {
    +            try
    +            {
    +                long[] toQuery = new long[ids.length];
    +                for(int i = 0; i < ids.length; i++)
    +                    toQuery[i] = ids[i];
    +
    +                for (User tStat : client.lookupUsers(toQuery)) {
    +
    +                    TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, tStat.getId());
    +                    executor.submit(providerTask);
    +
    +                }
    +                keepTrying = 10;
    +            }
    +            catch(TwitterException twitterException) {
    +                keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
    +            }
    +            catch(Exception e) {
    +                keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
    +            }
             }
    -        Thread.yield();
         }
     
    -    public StreamsResultSet readCurrent() {
    -        LOGGER.debug("{} readCurrent", STREAMS_ID);
    -
    -        Preconditions.checkArgument(ids.hasNext());
    -        StreamsResultSet result;
    +    private void loadBatch(String[] ids) {
    --- End diff --
    
    Legacy code. Agreed, improving.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by steveblackmon <gi...@git.apache.org>.
Github user steveblackmon commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/51#discussion_r15413791
  
    --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java ---
    @@ -42,8 +42,10 @@ public static int handleTwitterError(Twitter twitter, Exception exception)
                 {
                     LOGGER.warn("Rate Limit Exceeded");
                     try {
    -                    Thread.sleep(backoff *= 2);
    -                } catch (InterruptedException e1) {}
    +                    Thread.sleep(retry);
    --- End diff --
    
    based on my experiments I'm certain that this approach works


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by steveblackmon <gi...@git.apache.org>.
Github user steveblackmon commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/51#discussion_r15414311
  
    --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---
    @@ -125,101 +104,88 @@ public TwitterTimelineProvider(TwitterStreamConfiguration config, Class klass) {
         @Override
         public void startStream() {
             LOGGER.debug("{} startStream", STREAMS_ID);
    -        throw new org.apache.commons.lang.NotImplementedException();
    -    }
     
    -    protected void captureTimeline(long currentId) {
    +        Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
     
    -        Paging paging = new Paging(1, 200);
    -        List<Status> statuses = null;
    -        boolean KeepGoing = true;
    -        boolean hadFailure = false;
    +        LOGGER.info("readCurrent");
     
    -        do
    -        {
    -            int keepTrying = 0;
    +        while(idsBatches.hasNext())
    +            loadBatch(idsBatches.next());
     
    -            // keep trying to load, give it 5 attempts.
    -            //This value was chosen because it seemed like a reasonable number of times
    -            //to retry capturing a timeline given the sorts of errors that could potentially
    -            //occur (network timeout/interruption, faulty client, etc.)
    -            while (keepTrying < 5)
    -            {
    +        while(screenNameBatches.hasNext())
    +            loadBatch(screenNameBatches.next());
     
    -                try
    -                {
    -                    statuses = client.getUserTimeline(currentId, paging);
    -                    for (Status tStat : statuses) {
    -                        String json = TwitterObjectFactory.getRawJSON(tStat);
    -                        try {
    -                            lock.readLock().lock();
    -                            ComponentUtils.offerUntilSuccess(new StreamsDatum(json), providerQueue);
    -                        } finally {
    -                            lock.readLock().unlock();
    -                        }
    -                    }
    -
    -                    paging.setPage(paging.getPage() + 1);
    -
    -                    keepTrying = 10;
    -                }
    -                catch(TwitterException twitterException) {
    -                    keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
    -                }
    -                catch(Exception e) {
    -                    keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
    -                }
    -            }
    -        }
    -        while (shouldContinuePulling(statuses));
    +        executor.shutdown();
         }
     
    -    private Map<Long, Long> userPullInfo;
    -
         protected boolean shouldContinuePulling(List<Status> statuses) {
             return (statuses != null) && (statuses.size() > 0);
         }
     
    -    private void sleep()
    -    {
    -        Thread.yield();
    -        try {
    -            // wait one tenth of a millisecond
    -            Thread.yield();
    -            Thread.sleep(1);
    -            Thread.yield();
    -        }
    -        catch(IllegalArgumentException e) {
    -            // passing in static values, this will never happen
    -        }
    -        catch(InterruptedException e) {
    -            // noOp, there must have been an issue sleeping
    +    private void loadBatch(Long[] ids) {
    +        Twitter client = getTwitterClient();
    +        int keepTrying = 0;
    +
    +        // keep trying to load, give it 5 attempts.
    +        //while (keepTrying < 10)
    +        while (keepTrying < 1)
    +        {
    +            try
    --- End diff --
    
    Legacy code.  Agreed, removing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by mfranklin <gi...@git.apache.org>.
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/51#discussion_r15170031
  
    --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---
    @@ -272,11 +244,10 @@ void shutdownAndAwaitTermination(ExecutorService pool) {
             }
         }
     
    -
         @Override
         public void prepare(Object o) {
     
    -        executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
    +        executor = getExecutor();
             running.set(true);
    --- End diff --
    
    Wouldn't you start running on startStream?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by robdouglas <gi...@git.apache.org>.
Github user robdouglas commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/51#discussion_r15944963
  
    --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---
    @@ -325,4 +274,49 @@ protected Twitter getTwitterClient()
         public void cleanUp() {
             shutdownAndAwaitTermination(executor);
         }
    +
    +    protected List<Long> numericIdsOnly(List<String> allIds) {
    +        List<Long> result = Lists.newArrayList();
    +        for(String id : allIds) {
    +            if(id != null)
    +            {
    +                // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
    +                // screen name list
    +                try {
    +                    result.add(Long.parseLong(id));
    +                } catch (NumberFormatException e) {}
    --- End diff --
    
    The comment above states that if an id is NOT a long then ti should be added to the screen name list. Is there a reason why this is not happening in the catch block?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by steveblackmon <gi...@git.apache.org>.
Github user steveblackmon commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/51#discussion_r15969358
  
    --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java ---
    @@ -58,47 +56,39 @@ public void run() {
                 int keepTrying = 0;
     
                 // keep trying to load, give it 5 attempts.
    -            //while (keepTrying < 10)
    -            while (keepTrying < 1)
    +            //This value was chosen because it seemed like a reasonable number of times
    +            //to retry capturing a timeline given the sorts of errors that could potentially
    +            //occur (network timeout/interruption, faulty client, etc.)
    +            while (keepTrying < 5)
                 {
     
                     try
                     {
    -                    statuses = twitter.getUserTimeline(id, paging);
    +                    statuses = client.getUserTimeline(id, paging);
    --- End diff --
    
    works for me


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by rbnks <gi...@git.apache.org>.
Github user rbnks commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/51#discussion_r14898532
  
    --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java ---
    @@ -42,7 +42,7 @@ public static int handleTwitterError(Twitter twitter, Exception exception)
                 {
                     LOGGER.warn("Rate Limit Exceeded");
                     try {
    -                    Thread.sleep(backoff *= 2);
    +                    Thread.sleep(retry);
                     } catch (InterruptedException e1) {}
    --- End diff --
    
    I know this isn't party of your pull request, but we shouldn't be swallowing InterruptedExceptions.  Can we fix this or at least open a ticket to fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-streams/pull/51


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by robdouglas <gi...@git.apache.org>.
Github user robdouglas commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/51#discussion_r15954260
  
    --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java ---
    @@ -58,47 +56,39 @@ public void run() {
                 int keepTrying = 0;
     
                 // keep trying to load, give it 5 attempts.
    -            //while (keepTrying < 10)
    -            while (keepTrying < 1)
    +            //This value was chosen because it seemed like a reasonable number of times
    +            //to retry capturing a timeline given the sorts of errors that could potentially
    +            //occur (network timeout/interruption, faulty client, etc.)
    +            while (keepTrying < 5)
                 {
     
                     try
                     {
    -                    statuses = twitter.getUserTimeline(id, paging);
    +                    statuses = client.getUserTimeline(id, paging);
    --- End diff --
    
    Before this step, it may make sense to fetch a new twitter client


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by steveblackmon <gi...@git.apache.org>.
Github user steveblackmon commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/51#discussion_r15969353
  
    --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---
    @@ -272,12 +210,11 @@ void shutdownAndAwaitTermination(ExecutorService pool) {
             }
         }
     
    -
         @Override
         public void prepare(Object o) {
    --- End diff --
    
    works for me


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-streams pull request: Streams 115

Posted by mfranklin <gi...@git.apache.org>.
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/51#discussion_r15169507
  
    --- Diff: streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java ---
    @@ -42,8 +42,10 @@ public static int handleTwitterError(Twitter twitter, Exception exception)
                 {
                     LOGGER.warn("Rate Limit Exceeded");
                     try {
    -                    Thread.sleep(backoff *= 2);
    -                } catch (InterruptedException e1) {}
    +                    Thread.sleep(retry);
    --- End diff --
    
    Are we sure that calls to the twitter API during a backoff period do not 'reset' the clock?  IE if we make 300 calls in 1 min and need to wait 14 to reach the 15 min boundary, would an unsuccessful call at 3 minutes result in the window being reset?  
    
    If not, then this approach should work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---