You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/13 17:22:56 UTC

[06/14] git commit: merging fixes and addressing serialization problems to get twitter streams-examples working.

merging fixes and addressing serialization problems to get twitter streams-examples working.


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7293594e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7293594e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7293594e

Branch: refs/heads/master
Commit: 7293594eac98535e4b2206dca68da9b73e6cdf24
Parents: ae27541 522096e
Author: sblackmon <sb...@w2odigital.com>
Authored: Thu May 8 12:56:36 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Thu May 8 12:56:36 2014 -0500

----------------------------------------------------------------------
 .../streams-persist-elasticsearch/pom.xml       |  1 -
 .../ElasticsearchConfigurator.java              |  7 ++
 .../ElasticsearchPersistWriter.java             | 80 +++++++++++++++++++-
 .../ElasticsearchWriterConfiguration.json       | 10 +++
 .../twitter/processor/TwitterTypeConverter.java |  2 +-
 .../twitter/provider/TwitterStreamProvider.java | 68 ++++++++---------
 .../provider/TwitterTimelineProvider.java       | 50 ++++++++----
 .../local/tasks/StreamsProviderTask.java        |  2 +-
 8 files changed, 161 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7293594e/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7293594e/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --cc streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index ff7e0f5,405d1b8..80d2775
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@@ -59,96 -50,68 +59,109 @@@ public class ElasticsearchPersistWrite
      private String parentID = null;
      private BulkRequestBuilder bulkRequest;
      private OutputStreamWriter currentWriter = null;
-     private int batchSize = 50;
-     private int totalRecordsWritten = 0;
-     private boolean veryLargeBulk = false;  // by default this setting is set to false
  
+     private long batchSize;
+     private boolean veryLargeBulk;  // by default this setting is set to false
++    private int totalRecordsWritten = 0;
++    
 +    protected Thread task;
  
 -    private final static Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
 -    private static final long WAITING_DOCS_LIMIT = 10000;
 -
 -    public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
 +    protected volatile Queue<StreamsDatum> persistQueue;
  
+     private volatile int currentItems = 0;
      private volatile int totalSent = 0;
      private volatile int totalSeconds = 0;
      private volatile int totalAttempted = 0;
      private volatile int totalOk = 0;
      private volatile int totalFailed = 0;
      private volatile int totalBatchCount = 0;
+     private volatile int totalRecordsWritten = 0;
      private volatile long totalSizeInBytes = 0;
 -
      private volatile long batchSizeInBytes = 0;
      private volatile int batchItemsSent = 0;
 +    private volatile int totalByteCount = 0;
 +    private volatile int byteCount = 0;
 +
 +    public ElasticsearchPersistWriter() {
 +        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
 +        this.config = (ElasticsearchWriterConfiguration) ElasticsearchConfigurator.detectConfiguration(config);
 +    }
 +
 +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
 +        this.config = config;
 +    }
 +
 +    public void setBatchSize(int batchSize) {
 +        this.batchSize = batchSize;
 +    }
 +
 +    public void setVeryLargeBulk(boolean veryLargeBulk) {
 +        this.veryLargeBulk = veryLargeBulk;
 +    }
  
+     private final List<String> affectedIndexes = new ArrayList<String>();
  
 -    public int getTotalOutstanding()                           { return this.totalSent - (this.totalFailed + this.totalOk); }
 -    public long getFlushThresholdSizeInBytes()                 { return flushThresholdSizeInBytes; }
 -    public int getTotalSent()                                  { return totalSent; }
 -    public int getTotalSeconds()                               { return totalSeconds; }
 -    public int getTotalOk()                                    { return totalOk; }
 -    public int getTotalFailed()                                { return totalFailed; }
 -    public int getTotalBatchCount()                            { return totalBatchCount; }
 -    public long getTotalSizeInBytes()                          { return totalSizeInBytes; }
 -    public long getBatchSizeInBytes()                          { return batchSizeInBytes; }
 -    public int getBatchItemsSent()                             { return batchItemsSent; }
 -    public List<String> getAffectedIndexes()                   { return this.affectedIndexes; }
 +    public int getTotalOutstanding() {
 +        return this.totalSent - (this.totalFailed + this.totalOk);
 +    }
  
 -    public void setFlushThresholdSizeInBytes(long sizeInBytes)  { this.flushThresholdSizeInBytes = sizeInBytes; }
 +    public long getFlushThresholdSizeInBytes() {
 +        return flushThresholdSizeInBytes;
 +    }
  
 -    Thread task;
 +    public int getTotalSent() {
 +        return totalSent;
 +    }
  
 -    protected volatile Queue<StreamsDatum> persistQueue;
 +    public int getTotalSeconds() {
 +        return totalSeconds;
 +    }
  
 -    private ObjectMapper mapper;
 +    public int getTotalOk() {
 +        return totalOk;
 +    }
++    
++    private ObjectMapper mapper = new StreamsJacksonMapper();
  
 -    private ElasticsearchWriterConfiguration config;
 +    public int getTotalFailed() {
 +        return totalFailed;
 +    }
  
 -    public ElasticsearchPersistWriter() {
 -        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
 -        this.config = (ElasticsearchWriterConfiguration) ElasticsearchConfigurator.detectConfiguration(config);
 +    public int getTotalBatchCount() {
 +        return totalBatchCount;
      }
  
 -    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
 -        this.config = config;
 +    public long getTotalSizeInBytes() {
 +        return totalSizeInBytes;
 +    }
 +
 +    public long getBatchSizeInBytes() {
 +        return batchSizeInBytes;
 +    }
 +
 +    public int getBatchItemsSent() {
 +        return batchItemsSent;
 +    }
 +
 +    public List<String> getAffectedIndexes() {
 +        return this.affectedIndexes;
 +    }
 +
 +    public void setFlushThresholdSizeInBytes(long sizeInBytes) {
 +        this.flushThresholdSizeInBytes = sizeInBytes;
      }
  
 +    public boolean isConnected() {
 +        return (client != null);
 +    }
 +
++    private ElasticsearchWriterConfiguration config;
++
+     private static final int  BYTES_IN_MB = 1024*1024;
+     private static final int  BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
+     private volatile int  totalByteCount = 0;
+     private volatile int  byteCount = 0;
 -
 -    public boolean isConnected() 		                { return (client != null); }
 -
++    
      @Override
      public void write(StreamsDatum streamsDatum) {
  
@@@ -280,10 -238,13 +293,11 @@@
              // reset the current batch statistics
              this.batchSizeInBytes = 0;
              this.batchItemsSent = 0;
+             this.currentItems = 0;
  
 -            try
 -            {
 +            try {
                  int count = 0;
 -                if(this.getTotalOutstanding() > WAITING_DOCS_LIMIT)
 -                {
 +                if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) {
                      /****************************************************************************
                       * Author:
                       * Smashew
@@@ -381,8 -402,60 +395,60 @@@
          }
      }
  
+     private void trackItemAndBytesWritten(long sizeInBytes)
+     {
+         currentItems++;
+         batchItemsSent++;
+         batchSizeInBytes += sizeInBytes;
+ 
+         // If our queue is larger than our flush threashold, then we should flush the queue.
+         if( (batchSizeInBytes > flushThresholdSizeInBytes) ||
+                 (currentItems >= batchSize) )
+             flushInternal();
+     }
+ 
+     private void checkAndCreateBulkRequest()
+     {
+         // Synchronize to ensure that we don't lose any records
+         synchronized (this)
+         {
+             if(bulkRequest == null)
+                 bulkRequest = this.manager.getClient().prepareBulk();
+         }
+     }
+ 
+     private void checkIndexImplications(String indexName)
+     {
+ 
+         // check to see if we have seen this index before.
+         if(this.affectedIndexes.contains(indexName))
+             return;
+ 
+         // we haven't log this index.
+         this.affectedIndexes.add(indexName);
+ 
+         // Check to see if we are in 'veryLargeBulk' mode
+         // if we aren't, exit early
+         if(!this.veryLargeBulk)
+             return;
+ 
+ 
+         // They are in 'very large bulk' mode we want to turn off refreshing the index.
+ 
+         // Create a request then add the setting to tell it to stop refreshing the interval
+         UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
+         updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1));
+ 
+         // submit to ElasticSearch
+         this.manager.getClient()
+                 .admin()
+                 .indices()
+                 .updateSettings(updateSettingsRequest)
+                 .actionGet();
+     }
+ 
      public void createIndexIfMissing(String indexName) {
 -        if(!this.manager.getClient()
 +        if (!this.manager.getClient()
                  .admin()
                  .indices()
                  .exists(new IndicesExistsRequest(indexName))
@@@ -446,91 -524,20 +512,99 @@@
          return toReturn;
      }
  
+     @Override
+     public void prepare(Object configurationObject) {
+         mapper = StreamsJacksonMapper.getInstance();
+         veryLargeBulk = this.config.getBulk();
+         batchSize = this.config.getBatchSize();
+         start();
+     }
++    
 +    private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final Long thisSizeInBytes) {
 +        bulkRequest.execute().addListener(new ActionListener<BulkResponse>() {
 +            @Override
 +            public void onResponse(BulkResponse bulkItemResponses) {
 +                if (bulkItemResponses.hasFailures())
 +                    LOGGER.warn("Bulk Uploading had totalFailed: " + bulkItemResponses.buildFailureMessage());
  
 -    @Override
 -    public DatumStatusCounter getDatumStatusCounter() {
 -        DatumStatusCounter counters = new DatumStatusCounter();
 -        counters.incrementAttempt(this.batchItemsSent);
 -        counters.incrementStatus(DatumStatus.SUCCESS, this.totalOk);
 -        counters.incrementStatus(DatumStatus.FAIL, this.totalFailed);
 -        return counters;
 +                long thisFailed = 0;
 +                long thisOk = 0;
 +                long thisMillis = bulkItemResponses.getTookInMillis();
 +
 +                // keep track of the number of totalFailed and items that we have totalOk.
 +                for (BulkItemResponse resp : bulkItemResponses.getItems()) {
 +                    if (resp.isFailed())
 +                        thisFailed++;
 +                    else
 +                        thisOk++;
 +                }
 +
 +                totalAttempted += thisSent;
 +                totalOk += thisOk;
 +                totalFailed += thisFailed;
 +                totalSeconds += (thisMillis / 1000);
 +
 +                if (thisSent != (thisOk + thisFailed))
 +                    LOGGER.error("We sent more items than this");
 +
 +                LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]",
 +                        MEGABYTE_FORMAT.format((double) thisSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(thisOk), NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis),
 +                        MEGABYTE_FORMAT.format((double) totalSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding()));
 +            }
 +
 +            @Override
 +            public void onFailure(Throwable e) {
 +                LOGGER.error("Error bulk loading: {}", e.getMessage());
 +                e.printStackTrace();
 +            }
 +        });
 +
 +        this.notify();
 +    }
 +
 +    private void trackItemAndBytesWritten(long sizeInBytes) {
 +        batchItemsSent++;
 +        batchSizeInBytes += sizeInBytes;
 +
 +        // If our queue is larger than our flush threashold, then we should flush the queue.
 +        if (batchSizeInBytes > flushThresholdSizeInBytes)
 +            flushInternal();
 +    }
 +
 +    private void checkAndCreateBulkRequest() {
 +        // Synchronize to ensure that we don't lose any records
 +        synchronized (this) {
 +            if (bulkRequest == null)
 +                bulkRequest = this.manager.getClient().prepareBulk();
 +        }
 +    }
 +
 +    private void checkIndexImplications(String indexName) {
 +
 +        // check to see if we have seen this index before.
 +        if (this.affectedIndexes.contains(indexName))
 +            return;
 +
 +        // we haven't log this index.
 +        this.affectedIndexes.add(indexName);
 +
 +        // Check to see if we are in 'veryLargeBulk' mode
 +        // if we aren't, exit early
 +        if (!this.veryLargeBulk)
 +            return;
 +
 +
 +        // They are in 'very large bulk' mode we want to turn off refreshing the index.
 +
 +        // Create a request then add the setting to tell it to stop refreshing the interval
 +        UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
 +        updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1));
 +
 +        // submit to ElasticSearch
 +        this.manager.getClient()
 +                .admin()
 +                .indices()
 +                .updateSettings(updateSettingsRequest)
 +                .actionGet();
      }
  }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7293594e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --cc streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index b456fa4,20ee951..db1ec76
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@@ -1,7 -1,8 +1,9 @@@
  package org.apache.streams.twitter.provider;
  
+ import com.google.common.base.Optional;
  import com.google.common.base.Preconditions;
 +import com.google.common.collect.Queues;
+ import com.google.common.util.concurrent.ListenableFuture;
  import com.google.common.util.concurrent.ListeningExecutorService;
  import com.google.common.util.concurrent.MoreExecutors;
  import com.typesafe.config.Config;
@@@ -20,11 -21,10 +23,13 @@@ import twitter4j.json.DataObjectFactory
  
  import java.io.Serializable;
  import java.math.BigInteger;
- import java.util.*;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Queue;
 -import java.util.concurrent.*;
 +import java.util.concurrent.ArrayBlockingQueue;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
  
  /**
   * Created by sblackmon on 12/10/13.
@@@ -47,16 -47,22 +52,20 @@@ public class TwitterTimelineProvider im
          this.config = config;
      }
  
 -    protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
 +    protected final Queue<StreamsDatum> providerQueue = Queues.synchronizedQueue(new ArrayBlockingQueue<StreamsDatum>(500));
  
 +    protected int idsCount;
+     protected Twitter client;
      protected Iterator<Long> ids;
  
 -    ListenableFuture providerTaskComplete;
 -//
 -//    public BlockingQueue<Object> getInQueue() {
 -//        return inQueue;
 -//    }
 -
      protected ListeningExecutorService executor;
  
      protected DateTime start;
      protected DateTime end;
  
++    Boolean jsonStoreEnabled;
++    Boolean includeEntitiesEnabled;
++
      private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
          return new ThreadPoolExecutor(nThreads, nThreads,
                  5000L, TimeUnit.MILLISECONDS,
@@@ -88,12 -93,36 +96,13 @@@
          return this.providerQueue;
      }
  
 -//    public void run() {
 -//
 -//        LOGGER.info("{} Running", STREAMS_ID);
 -//
 -//        while( ids.hasNext() ) {
 -//            Long currentId = ids.next();
 -//            LOGGER.info("Provider Task Starting: {}", currentId);
 -//            captureTimeline(currentId);
 -//        }
 -//
 -//        LOGGER.info("{} Finished.  Cleaning up...", STREAMS_ID);
 -//
 -//        client.shutdown();
 -//
 -//        LOGGER.info("{} Exiting", STREAMS_ID);
 -//
 -//        while(!providerTaskComplete.isDone() && !providerTaskComplete.isCancelled() ) {
 -//            try {
 -//                Thread.sleep(100);
 -//            } catch (InterruptedException e) {}
 -//        }
 -//    }
 -
      @Override
      public void startStream() {
-         // no op
+         LOGGER.debug("{} startStream", STREAMS_ID);
+         throw new org.apache.commons.lang.NotImplementedException();
      }
  
 -    private void captureTimeline(long currentId) {
 +    protected void captureTimeline(long currentId) {
  
          Paging paging = new Paging(1, 200);
          List<Status> statuses = null;
@@@ -223,14 -251,14 +235,19 @@@
          Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
          Preconditions.checkNotNull(config.getOauth().getAccessToken());
          Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
 -
          Preconditions.checkNotNull(config.getFollow());
  
 -        Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
 -        Boolean includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true);
 -
 +        idsCount = config.getFollow().size();
          ids = config.getFollow().iterator();
+ 
++        jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
++        includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true);
++
 +    }
 +
++    
 +    protected Twitter getTwitterClient()
 +    {
          String baseUrl = "https://api.twitter.com:443/1.1/";
  
          ConfigurationBuilder builder = new ConfigurationBuilder()
@@@ -243,9 -271,11 +260,10 @@@
                  .setAsyncNumThreads(3)
                  .setRestBaseURL(baseUrl)
                  .setIncludeMyRetweetEnabled(Boolean.TRUE)
+                 .setIncludeRTsEnabled(Boolean.TRUE)
                  .setPrettyDebugEnabled(Boolean.TRUE);
  
 -        client = new TwitterFactory(builder.build()).getInstance();
 -
 +        return new TwitterFactory(builder.build()).getInstance();
      }
  
      @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7293594e/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------