You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/13 17:22:56 UTC
[06/14] git commit: merging fixes and addressing serialization
problems to get twitter streams-examples working.
merging fixes and addressing serialization problems to get twitter streams-examples working.
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7293594e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7293594e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7293594e
Branch: refs/heads/master
Commit: 7293594eac98535e4b2206dca68da9b73e6cdf24
Parents: ae27541 522096e
Author: sblackmon <sb...@w2odigital.com>
Authored: Thu May 8 12:56:36 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Thu May 8 12:56:36 2014 -0500
----------------------------------------------------------------------
.../streams-persist-elasticsearch/pom.xml | 1 -
.../ElasticsearchConfigurator.java | 7 ++
.../ElasticsearchPersistWriter.java | 80 +++++++++++++++++++-
.../ElasticsearchWriterConfiguration.json | 10 +++
.../twitter/processor/TwitterTypeConverter.java | 2 +-
.../twitter/provider/TwitterStreamProvider.java | 68 ++++++++---------
.../provider/TwitterTimelineProvider.java | 50 ++++++++----
.../local/tasks/StreamsProviderTask.java | 2 +-
8 files changed, 161 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7293594e/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7293594e/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --cc streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index ff7e0f5,405d1b8..80d2775
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@@ -59,96 -50,68 +59,109 @@@ public class ElasticsearchPersistWrite
private String parentID = null;
private BulkRequestBuilder bulkRequest;
private OutputStreamWriter currentWriter = null;
- private int batchSize = 50;
- private int totalRecordsWritten = 0;
- private boolean veryLargeBulk = false; // by default this setting is set to false
+ private long batchSize;
+ private boolean veryLargeBulk; // by default this setting is set to false
++ private int totalRecordsWritten = 0;
++
+ protected Thread task;
- private final static Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
- private static final long WAITING_DOCS_LIMIT = 10000;
-
- public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
+ protected volatile Queue<StreamsDatum> persistQueue;
+ private volatile int currentItems = 0;
private volatile int totalSent = 0;
private volatile int totalSeconds = 0;
private volatile int totalAttempted = 0;
private volatile int totalOk = 0;
private volatile int totalFailed = 0;
private volatile int totalBatchCount = 0;
+ private volatile int totalRecordsWritten = 0;
private volatile long totalSizeInBytes = 0;
-
private volatile long batchSizeInBytes = 0;
private volatile int batchItemsSent = 0;
+ private volatile int totalByteCount = 0;
+ private volatile int byteCount = 0;
+
+ public ElasticsearchPersistWriter() {
+ Config config = StreamsConfigurator.config.getConfig("elasticsearch");
+ this.config = (ElasticsearchWriterConfiguration) ElasticsearchConfigurator.detectConfiguration(config);
+ }
+
+ public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
+ this.config = config;
+ }
+
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ public void setVeryLargeBulk(boolean veryLargeBulk) {
+ this.veryLargeBulk = veryLargeBulk;
+ }
+ private final List<String> affectedIndexes = new ArrayList<String>();
- public int getTotalOutstanding() { return this.totalSent - (this.totalFailed + this.totalOk); }
- public long getFlushThresholdSizeInBytes() { return flushThresholdSizeInBytes; }
- public int getTotalSent() { return totalSent; }
- public int getTotalSeconds() { return totalSeconds; }
- public int getTotalOk() { return totalOk; }
- public int getTotalFailed() { return totalFailed; }
- public int getTotalBatchCount() { return totalBatchCount; }
- public long getTotalSizeInBytes() { return totalSizeInBytes; }
- public long getBatchSizeInBytes() { return batchSizeInBytes; }
- public int getBatchItemsSent() { return batchItemsSent; }
- public List<String> getAffectedIndexes() { return this.affectedIndexes; }
+ public int getTotalOutstanding() {
+ return this.totalSent - (this.totalFailed + this.totalOk);
+ }
- public void setFlushThresholdSizeInBytes(long sizeInBytes) { this.flushThresholdSizeInBytes = sizeInBytes; }
+ public long getFlushThresholdSizeInBytes() {
+ return flushThresholdSizeInBytes;
+ }
- Thread task;
+ public int getTotalSent() {
+ return totalSent;
+ }
- protected volatile Queue<StreamsDatum> persistQueue;
+ public int getTotalSeconds() {
+ return totalSeconds;
+ }
- private ObjectMapper mapper;
+ public int getTotalOk() {
+ return totalOk;
+ }
++
++ private ObjectMapper mapper = new StreamsJacksonMapper();
- private ElasticsearchWriterConfiguration config;
+ public int getTotalFailed() {
+ return totalFailed;
+ }
- public ElasticsearchPersistWriter() {
- Config config = StreamsConfigurator.config.getConfig("elasticsearch");
- this.config = (ElasticsearchWriterConfiguration) ElasticsearchConfigurator.detectConfiguration(config);
+ public int getTotalBatchCount() {
+ return totalBatchCount;
}
- public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
- this.config = config;
+ public long getTotalSizeInBytes() {
+ return totalSizeInBytes;
+ }
+
+ public long getBatchSizeInBytes() {
+ return batchSizeInBytes;
+ }
+
+ public int getBatchItemsSent() {
+ return batchItemsSent;
+ }
+
+ public List<String> getAffectedIndexes() {
+ return this.affectedIndexes;
+ }
+
+ public void setFlushThresholdSizeInBytes(long sizeInBytes) {
+ this.flushThresholdSizeInBytes = sizeInBytes;
}
+ public boolean isConnected() {
+ return (client != null);
+ }
+
++ private ElasticsearchWriterConfiguration config;
++
+ private static final int BYTES_IN_MB = 1024*1024;
+ private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
+ private volatile int totalByteCount = 0;
+ private volatile int byteCount = 0;
-
- public boolean isConnected() { return (client != null); }
-
++
@Override
public void write(StreamsDatum streamsDatum) {
@@@ -280,10 -238,13 +293,11 @@@
// reset the current batch statistics
this.batchSizeInBytes = 0;
this.batchItemsSent = 0;
+ this.currentItems = 0;
- try
- {
+ try {
int count = 0;
- if(this.getTotalOutstanding() > WAITING_DOCS_LIMIT)
- {
+ if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) {
/****************************************************************************
* Author:
* Smashew
@@@ -381,8 -402,60 +395,60 @@@
}
}
+ private void trackItemAndBytesWritten(long sizeInBytes)
+ {
+ currentItems++;
+ batchItemsSent++;
+ batchSizeInBytes += sizeInBytes;
+
+ // If our queue is larger than our flush threashold, then we should flush the queue.
+ if( (batchSizeInBytes > flushThresholdSizeInBytes) ||
+ (currentItems >= batchSize) )
+ flushInternal();
+ }
+
+ private void checkAndCreateBulkRequest()
+ {
+ // Synchronize to ensure that we don't lose any records
+ synchronized (this)
+ {
+ if(bulkRequest == null)
+ bulkRequest = this.manager.getClient().prepareBulk();
+ }
+ }
+
+ private void checkIndexImplications(String indexName)
+ {
+
+ // check to see if we have seen this index before.
+ if(this.affectedIndexes.contains(indexName))
+ return;
+
+ // we haven't log this index.
+ this.affectedIndexes.add(indexName);
+
+ // Check to see if we are in 'veryLargeBulk' mode
+ // if we aren't, exit early
+ if(!this.veryLargeBulk)
+ return;
+
+
+ // They are in 'very large bulk' mode we want to turn off refreshing the index.
+
+ // Create a request then add the setting to tell it to stop refreshing the interval
+ UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
+ updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1));
+
+ // submit to ElasticSearch
+ this.manager.getClient()
+ .admin()
+ .indices()
+ .updateSettings(updateSettingsRequest)
+ .actionGet();
+ }
+
public void createIndexIfMissing(String indexName) {
- if(!this.manager.getClient()
+ if (!this.manager.getClient()
.admin()
.indices()
.exists(new IndicesExistsRequest(indexName))
@@@ -446,91 -524,20 +512,99 @@@
return toReturn;
}
+ @Override
+ public void prepare(Object configurationObject) {
+ mapper = StreamsJacksonMapper.getInstance();
+ veryLargeBulk = this.config.getBulk();
+ batchSize = this.config.getBatchSize();
+ start();
+ }
++
+ private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final Long thisSizeInBytes) {
+ bulkRequest.execute().addListener(new ActionListener<BulkResponse>() {
+ @Override
+ public void onResponse(BulkResponse bulkItemResponses) {
+ if (bulkItemResponses.hasFailures())
+ LOGGER.warn("Bulk Uploading had totalFailed: " + bulkItemResponses.buildFailureMessage());
- @Override
- public DatumStatusCounter getDatumStatusCounter() {
- DatumStatusCounter counters = new DatumStatusCounter();
- counters.incrementAttempt(this.batchItemsSent);
- counters.incrementStatus(DatumStatus.SUCCESS, this.totalOk);
- counters.incrementStatus(DatumStatus.FAIL, this.totalFailed);
- return counters;
+ long thisFailed = 0;
+ long thisOk = 0;
+ long thisMillis = bulkItemResponses.getTookInMillis();
+
+ // keep track of the number of totalFailed and items that we have totalOk.
+ for (BulkItemResponse resp : bulkItemResponses.getItems()) {
+ if (resp.isFailed())
+ thisFailed++;
+ else
+ thisOk++;
+ }
+
+ totalAttempted += thisSent;
+ totalOk += thisOk;
+ totalFailed += thisFailed;
+ totalSeconds += (thisMillis / 1000);
+
+ if (thisSent != (thisOk + thisFailed))
+ LOGGER.error("We sent more items than this");
+
+ LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]",
+ MEGABYTE_FORMAT.format((double) thisSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(thisOk), NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis),
+ MEGABYTE_FORMAT.format((double) totalSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding()));
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ LOGGER.error("Error bulk loading: {}", e.getMessage());
+ e.printStackTrace();
+ }
+ });
+
+ this.notify();
+ }
+
+ private void trackItemAndBytesWritten(long sizeInBytes) {
+ batchItemsSent++;
+ batchSizeInBytes += sizeInBytes;
+
+ // If our queue is larger than our flush threashold, then we should flush the queue.
+ if (batchSizeInBytes > flushThresholdSizeInBytes)
+ flushInternal();
+ }
+
+ private void checkAndCreateBulkRequest() {
+ // Synchronize to ensure that we don't lose any records
+ synchronized (this) {
+ if (bulkRequest == null)
+ bulkRequest = this.manager.getClient().prepareBulk();
+ }
+ }
+
+ private void checkIndexImplications(String indexName) {
+
+ // check to see if we have seen this index before.
+ if (this.affectedIndexes.contains(indexName))
+ return;
+
+ // we haven't log this index.
+ this.affectedIndexes.add(indexName);
+
+ // Check to see if we are in 'veryLargeBulk' mode
+ // if we aren't, exit early
+ if (!this.veryLargeBulk)
+ return;
+
+
+ // They are in 'very large bulk' mode we want to turn off refreshing the index.
+
+ // Create a request then add the setting to tell it to stop refreshing the interval
+ UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
+ updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1));
+
+ // submit to ElasticSearch
+ this.manager.getClient()
+ .admin()
+ .indices()
+ .updateSettings(updateSettingsRequest)
+ .actionGet();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7293594e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --cc streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index b456fa4,20ee951..db1ec76
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@@ -1,7 -1,8 +1,9 @@@
package org.apache.streams.twitter.provider;
+ import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Queues;
+ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.typesafe.config.Config;
@@@ -20,11 -21,10 +23,13 @@@ import twitter4j.json.DataObjectFactory
import java.io.Serializable;
import java.math.BigInteger;
- import java.util.*;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Queue;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
/**
* Created by sblackmon on 12/10/13.
@@@ -47,16 -47,22 +52,20 @@@ public class TwitterTimelineProvider im
this.config = config;
}
- protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
+ protected final Queue<StreamsDatum> providerQueue = Queues.synchronizedQueue(new ArrayBlockingQueue<StreamsDatum>(500));
+ protected int idsCount;
+ protected Twitter client;
protected Iterator<Long> ids;
- ListenableFuture providerTaskComplete;
-//
-// public BlockingQueue<Object> getInQueue() {
-// return inQueue;
-// }
-
protected ListeningExecutorService executor;
protected DateTime start;
protected DateTime end;
++ Boolean jsonStoreEnabled;
++ Boolean includeEntitiesEnabled;
++
private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
return new ThreadPoolExecutor(nThreads, nThreads,
5000L, TimeUnit.MILLISECONDS,
@@@ -88,12 -93,36 +96,13 @@@
return this.providerQueue;
}
-// public void run() {
-//
-// LOGGER.info("{} Running", STREAMS_ID);
-//
-// while( ids.hasNext() ) {
-// Long currentId = ids.next();
-// LOGGER.info("Provider Task Starting: {}", currentId);
-// captureTimeline(currentId);
-// }
-//
-// LOGGER.info("{} Finished. Cleaning up...", STREAMS_ID);
-//
-// client.shutdown();
-//
-// LOGGER.info("{} Exiting", STREAMS_ID);
-//
-// while(!providerTaskComplete.isDone() && !providerTaskComplete.isCancelled() ) {
-// try {
-// Thread.sleep(100);
-// } catch (InterruptedException e) {}
-// }
-// }
-
@Override
public void startStream() {
- // no op
+ LOGGER.debug("{} startStream", STREAMS_ID);
+ throw new org.apache.commons.lang.NotImplementedException();
}
- private void captureTimeline(long currentId) {
+ protected void captureTimeline(long currentId) {
Paging paging = new Paging(1, 200);
List<Status> statuses = null;
@@@ -223,14 -251,14 +235,19 @@@
Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
Preconditions.checkNotNull(config.getOauth().getAccessToken());
Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
-
Preconditions.checkNotNull(config.getFollow());
- Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
- Boolean includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true);
-
+ idsCount = config.getFollow().size();
ids = config.getFollow().iterator();
+
++ jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
++ includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true);
++
+ }
+
++
+ protected Twitter getTwitterClient()
+ {
String baseUrl = "https://api.twitter.com:443/1.1/";
ConfigurationBuilder builder = new ConfigurationBuilder()
@@@ -243,9 -271,11 +260,10 @@@
.setAsyncNumThreads(3)
.setRestBaseURL(baseUrl)
.setIncludeMyRetweetEnabled(Boolean.TRUE)
+ .setIncludeRTsEnabled(Boolean.TRUE)
.setPrettyDebugEnabled(Boolean.TRUE);
- client = new TwitterFactory(builder.build()).getInstance();
-
+ return new TwitterFactory(builder.build()).getInstance();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7293594e/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------