You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/02/24 17:33:35 UTC
svn commit: r1571339 - in /incubator/streams/trunk: ./ streams-config-graph/
streams-config/ streams-contrib/
streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/
streams-contrib/streams-persist-elasticsearch/ streams-contr...
Author: mfranklin
Date: Mon Feb 24 16:33:34 2014
New Revision: 1571339
URL: http://svn.apache.org/r1571339
Log:
Reverting changes to 1569602
Added:
incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java
- copied unchanged from r1569602, incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
- copied unchanged from r1569602, incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
incubator/streams/trunk/streams-contrib/streams-provider-rss/streams-provider-twitter (2).iml
- copied unchanged from r1569602, incubator/streams/trunk/streams-contrib/streams-provider-rss/streams-provider-twitter (2).iml
incubator/streams/trunk/streams-contrib/streams-provider-rss/streams-provider-twitter.iml
- copied unchanged from r1569602, incubator/streams/trunk/streams-contrib/streams-provider-rss/streams-provider-twitter.iml
Removed:
incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsFilter.java
Modified:
incubator/streams/trunk/pom.xml
incubator/streams/trunk/streams-config-graph/pom.xml
incubator/streams/trunk/streams-config/pom.xml
incubator/streams/trunk/streams-contrib/pom.xml
incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/pom.xml
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
incubator/streams/trunk/streams-contrib/streams-provider-rss/streams-provider-rss.iml
incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
incubator/streams/trunk/streams-core/pom.xml
incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java
incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java
incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java
incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
Modified: incubator/streams/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/pom.xml?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/pom.xml (original)
+++ incubator/streams/trunk/pom.xml Mon Feb 24 16:33:34 2014
@@ -74,7 +74,7 @@
<commons-io.version>2.4</commons-io.version>
<commons-lang3.version>3.1</commons-lang3.version>
<typesafe.config.version>1.2.0</typesafe.config.version>
- <guava.version>16.0.1</guava.version>
+ <guava.version>15.0</guava.version>
<scala.version>2.8.0</scala.version>
<clojure.version>1.4.0</clojure.version>
<storm.version>0.9.0.1</storm.version>
@@ -94,7 +94,7 @@
<module>streams-config</module>
<module>streams-config-graph</module>
<module>streams-pojo</module>
- <!--<module>streams-storm</module>-->
+ <module>streams-storm</module>
<module>streams-util</module>
<module>streams-pojo-extensions</module>
<module>streams-contrib</module>
Modified: incubator/streams/trunk/streams-config-graph/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-config-graph/pom.xml?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-config-graph/pom.xml (original)
+++ incubator/streams/trunk/streams-config-graph/pom.xml Mon Feb 24 16:33:34 2014
@@ -37,8 +37,9 @@
<version>1.2</version>
</dependency>
<dependency>
- <groupId>com.google.guava</groupId>
+ <groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
+ <version>15.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
Modified: incubator/streams/trunk/streams-config/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-config/pom.xml?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-config/pom.xml (original)
+++ incubator/streams/trunk/streams-config/pom.xml Mon Feb 24 16:33:34 2014
@@ -34,10 +34,22 @@
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
+ <version>1.2.0</version>
</dependency>
+ <!--<dependency>-->
+ <!--<groupId>commons-configuration</groupId>-->
+ <!--<artifactId>commons-configuration</artifactId>-->
+ <!--<version>1.10</version>-->
+ <!--</dependency>-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
+ <version>15.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.collections</groupId>
+ <artifactId>google-collections</artifactId>
+ <version>1.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Modified: incubator/streams/trunk/streams-contrib/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/pom.xml?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/pom.xml (original)
+++ incubator/streams/trunk/streams-contrib/pom.xml Mon Feb 24 16:33:34 2014
@@ -38,17 +38,15 @@
<modules>
<module>streams-persist-console</module>
<module>streams-persist-elasticsearch</module>
- <!--<module>streams-persist-hbase</module>-->
- <!--<module>streams-persist-hdfs</module>-->
- <!--<module>streams-persist-kafka</module>-->
- <!--<module>streams-persist-mongo</module>-->
- <!--<module>streams-provider-datasift</module>-->
- <!--<module>streams-provider-facebook</module>-->
- <!--<module>streams-provider-gnip</module>-->
- <!--<module>streams-provider-moreover</module>-->
+ <module>streams-persist-hdfs</module>
+ <module>streams-persist-kafka</module>
+ <module>streams-provider-datasift</module>
+ <module>streams-provider-facebook</module>
+ <module>streams-provider-gnip</module>
+ <module>streams-provider-moreover</module>
<module>streams-provider-twitter</module>
- <!--<module>streams-provider-sysomos</module>-->
- <!--<module>streams-provider-rss</module>-->
+ <module>streams-provider-sysomos</module>
+ <module>streams-provider-rss</module>
<!--<module>streams-proxy-semantria</module>-->
</modules>
Modified: incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java Mon Feb 24 16:33:34 2014
@@ -5,31 +5,42 @@ import com.fasterxml.jackson.databind.Ob
import com.google.common.base.Preconditions;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.core.tasks.StreamsPersistWriterTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-public class ConsolePersistWriter extends StreamsPersistWriterTask implements StreamsPersistWriter {
+public class ConsolePersistWriter implements StreamsPersistWriter {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistWriter.class);
+ protected volatile Queue<StreamsDatum> persistQueue;
+
private ObjectMapper mapper = new ObjectMapper();
- public ConsolePersistWriter(StreamsPersistWriter writer) {
- super(writer);
+ public ConsolePersistWriter(Queue<StreamsDatum> persistQueue) {
+ this.persistQueue = persistQueue;
}
@Override
- public void prepare(Object o) {
- Preconditions.checkNotNull(this.getInputQueues());
+ public void start() {
+ Preconditions.checkNotNull(persistQueue);
+ new Thread(new ConsolePersistWriterTask(this)).start();
}
@Override
- public void cleanUp() {
+ public void stop() {
+
+ }
+ @Override
+ public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+ this.persistQueue = persistQueue;
+ }
+
+ @Override
+ public Queue<StreamsDatum> getPersistQueue() {
+ return this.persistQueue;
}
@Override
Modified: incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/pom.xml?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/pom.xml (original)
+++ incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/pom.xml Mon Feb 24 16:33:34 2014
@@ -35,7 +35,7 @@
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
- <version>0.90.5</version>
+ <version>1.0.0.RC2</version>
<scope>compile</scope>
<type>jar</type>
</dependency>
Modified: incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java Mon Feb 24 16:33:34 2014
@@ -1,14 +1,10 @@
package org.apache.streams.elasticsearch;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
-import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistReader;
import org.apache.streams.core.StreamsResultSet;
@@ -24,7 +20,6 @@ import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.math.BigInteger;
import java.util.*;
import java.util.concurrent.*;
@@ -35,7 +30,7 @@ import java.util.concurrent.*;
* steveblackmon
**************************************************************************************************************/
-public class ElasticsearchPersistReader implements StreamsPersistReader, Iterable<SearchHit>, Iterator<SearchHit>
+public class ElasticsearchPersistReader implements StreamsPersistReader, Iterable<SearchHit>, Iterator<SearchHit>, Runnable
{
private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistReader.class);
@@ -58,10 +53,6 @@ public class ElasticsearchPersistReader
private int batchSize = 100;
private String scrollTimeout = null;
- private ObjectMapper mapper = new ObjectMapper();
-
- private ElasticsearchConfiguration config;
-
private QueryBuilder queryBuilder;
private FilterBuilder filterBuilder;
@@ -93,10 +84,16 @@ public class ElasticsearchPersistReader
public void setQueryBuilder(QueryBuilder queryBuilder) { this.queryBuilder = queryBuilder; }
public void setFilterBuilder(FilterBuilder filterBuilder) { this.filterBuilder = filterBuilder; }
- public ElasticsearchPersistReader() {
- Config config = StreamsConfigurator.config.getConfig("elasticsearch");
- this.config = ElasticsearchConfigurator.detectConfiguration(config);
+ ListenableFuture providerTaskComplete;
+
+ protected ListeningExecutorService executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+
+ private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
+ return new ThreadPoolExecutor(nThreads, nThreads,
+ 5000L, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}
+
public ElasticsearchPersistReader(ElasticsearchConfiguration elasticsearchConfiguration) {
this.elasticsearchClientManager = new ElasticsearchClientManager(elasticsearchConfiguration);
}
@@ -120,59 +117,6 @@ public class ElasticsearchPersistReader
this.withoutfields = withoutfields;
}
- @Override
- public void prepare(Object o) {
-
- // If we haven't already set up the search, then set up the search.
- if(search == null)
- {
- search = elasticsearchClientManager.getClient()
- .prepareSearch(indexes)
- .setSearchType(SearchType.SCAN)
- .setSize(Objects.firstNonNull(batchSize, DEFAULT_BATCH_SIZE).intValue())
- .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT));
-
- if(this.queryBuilder != null)
- search.setQuery(this.queryBuilder);
-
- // If the types are null, then don't specify a type
- if(this.types != null && this.types.length > 0)
- search = search.setTypes(types);
-
- Integer clauses = 0;
- if(this.withfields != null || this.withoutfields != null) {
- if( this.withfields != null )
- clauses += this.withfields.length;
- if( this.withoutfields != null )
- clauses += this.withoutfields.length;
- }
-
- List<FilterBuilder> filterList = buildFilterList();
-
- FilterBuilder allFilters = andFilters(filterList);
-
- if( clauses > 0 ) {
- // search.setPostFilter(allFilters);
- search.setFilter(allFilters);
- }
-
- // TODO: Replace when all clusters are upgraded past 0.90.4 so we can implement a RANDOM scroll.
- if(this.random)
- search = search.addSort(SortBuilders.scriptSort("random()", "number"));
- }
-
- // We don't have a scroll, we need to create a scroll
- if(scrollResp == null) {
- scrollResp = search.execute().actionGet();
- LOGGER.trace(search.toString());
- }
- }
-
- @Override
- public void cleanUp() {
- LOGGER.info("PersistReader done");
- }
-
public void setWithfields(String[] withfields) {
this.withfields = withfields;
}
@@ -191,6 +135,49 @@ public class ElasticsearchPersistReader
{
try
{
+ // If we haven't already set up the search, then set up the search.
+ if(search == null)
+ {
+ search = elasticsearchClientManager.getClient()
+ .prepareSearch(indexes)
+ .setSearchType(SearchType.SCAN)
+ .setSize(Objects.firstNonNull(batchSize, DEFAULT_BATCH_SIZE).intValue())
+ .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT));
+
+ if(this.queryBuilder != null)
+ search.setQuery(this.queryBuilder);
+
+ // If the types are null, then don't specify a type
+ if(this.types != null && this.types.length > 0)
+ search = search.setTypes(types);
+
+ Integer clauses = 0;
+ if(this.withfields != null || this.withoutfields != null) {
+ if( this.withfields != null )
+ clauses += this.withfields.length;
+ if( this.withoutfields != null )
+ clauses += this.withoutfields.length;
+ }
+
+ List<FilterBuilder> filterList = buildFilterList();
+
+ FilterBuilder allFilters = andFilters(filterList);
+
+ if( clauses > 0 ) {
+ search.setPostFilter(allFilters);
+ }
+
+ // TODO: Replace when all clusters are upgraded past 0.90.4 so we can implement a RANDOM scroll.
+ if(this.random)
+ search = search.addSort(SortBuilders.scriptSort("random()", "number"));
+ }
+
+ // We don't have a scroll, we need to create a scroll
+ if(scrollResp == null) {
+ scrollResp = search.execute().actionGet();
+ LOGGER.trace(search.toString());
+ }
+
// We have exhausted our scroll create another scroll.
if(scrollPositionInScroll == SCROLL_POSITION_NOT_INITIALIZED || scrollPositionInScroll >= scrollResp.getHits().getHits().length)
{
@@ -293,25 +280,44 @@ public class ElasticsearchPersistReader
}
@Override
- public StreamsResultSet readCurrent() {
+ public void run() {
- Queue<StreamsDatum> currentQueue = new LinkedBlockingQueue<StreamsDatum>();
- while( hasNext()) {
- SearchHit hit = next();
- ObjectNode jsonObject = null;
+ providerTaskComplete = executor.submit((new Thread(new ElasticsearchPersistReaderTask(this))));
+
+ while( !providerTaskComplete.isDone()) {
try {
- jsonObject = mapper.readValue(hit.getSourceAsString(), ObjectNode.class);
- } catch (IOException e) {
- e.printStackTrace();
- break;
- }
- StreamsDatum item = new StreamsDatum(jsonObject);
- item.getMetadata().put("id", hit.getId());
- item.getMetadata().put("index", hit.getIndex());
- item.getMetadata().put("type", hit.getType());
- currentQueue.add(item);
+ Thread.sleep(new Random().nextInt(100));
+ } catch (InterruptedException e) { }
}
- return (StreamsResultSet)currentQueue;
+
+ stop();
+ }
+
+ @Override
+ public void start() {
+
+
+ }
+
+ @Override
+ public void stop() {
+ shutdownAndAwaitTermination(executor);
+ LOGGER.info("PersistReader done");
+ }
+
+ @Override
+ public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+ this.persistQueue = persistQueue;
+ }
+
+ @Override
+ public Queue<StreamsDatum> getPersistQueue() {
+ return this.persistQueue;
+ }
+
+ @Override
+ public StreamsResultSet readAll() {
+ return null;
}
@Override
@@ -324,6 +330,21 @@ public class ElasticsearchPersistReader
return null;
}
+ void shutdownAndAwaitTermination(ExecutorService pool) {
+ pool.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ pool.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+ System.err.println("Pool did not terminate");
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ pool.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
}
-
-
Modified: incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java Mon Feb 24 16:33:34 2014
@@ -9,13 +9,11 @@ import com.typesafe.config.Config;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.core.tasks.StreamsPersistWriterTask;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-//import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
-import org.elasticsearch.action.admin.indices.settings.UpdateSettingsRequest;
+import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
@@ -38,7 +36,7 @@ import java.text.NumberFormat;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
-public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable
+public class ElasticsearchPersistWriter implements StreamsPersistWriter, Runnable, Flushable, Closeable
{
private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
private final static NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
@@ -48,13 +46,12 @@ public class ElasticsearchPersistWriter
private Client client;
private String parentID = null;
private BulkRequestBuilder bulkRequest;
- private OutputStreamWriter currentWriter = null;
private String index = null;
private String type = null;
private int batchSize = 50;
private int totalRecordsWritten = 0;
- private boolean veryLargeBulk = false; // by default this setting is set to false
+ private OutputStreamWriter currentWriter = null;
private final static Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
private static final long WAITING_DOCS_LIMIT = 10000;
@@ -71,22 +68,7 @@ public class ElasticsearchPersistWriter
private volatile long batchSizeInBytes = 0;
private volatile int batchItemsSent = 0;
- public void setIndex(String index) {
- this.index = index;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public void setBatchSize(int batchSize) {
- this.batchSize = batchSize;
- }
-
- public void setVeryLargeBulk(boolean veryLargeBulk) {
- this.veryLargeBulk = veryLargeBulk;
- }
-
+ private boolean veryLargeBulk = false; // by default this setting is set to false
private final List<String> affectedIndexes = new ArrayList<String>();
public int getTotalOutstanding() { return this.totalSent - (this.totalFailed + this.totalOk); }
@@ -114,6 +96,44 @@ public class ElasticsearchPersistWriter
public ElasticsearchPersistWriter() {
Config config = StreamsConfigurator.config.getConfig("elasticsearch");
this.config = ElasticsearchConfigurator.detectConfiguration(config);
+ this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ }
+
+ public ElasticsearchPersistWriter(Queue<StreamsDatum> persistQueue) {
+ Config config = StreamsConfigurator.config.getConfig("elasticsearch");
+ this.config = ElasticsearchConfigurator.detectConfiguration(config);
+ this.persistQueue = persistQueue;
+ }
+
+ public ElasticsearchPersistWriter(ElasticsearchConfiguration config) {
+ this.config = config;
+ this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ }
+
+ public ElasticsearchPersistWriter(ElasticsearchConfiguration config, Queue<StreamsDatum> persistQueue) {
+ this.config = config;
+ this.persistQueue = persistQueue;
+ }
+
+ public ElasticsearchPersistWriter(ElasticsearchConfiguration config, Queue<StreamsDatum> persistQueue, String index) {
+ this.config = config;
+ this.persistQueue = persistQueue;
+ this.index = index;
+ }
+
+ public ElasticsearchPersistWriter(ElasticsearchConfiguration config, Queue<StreamsDatum> persistQueue, String index, String type) {
+ this.config = config;
+ this.persistQueue = persistQueue;
+ this.index = index;
+ this.type = type;
+ }
+
+ public ElasticsearchPersistWriter(ElasticsearchConfiguration config, Queue<StreamsDatum> persistQueue, String index, String type, boolean veryLargeBulk) {
+ this.config = config;
+ this.persistQueue = persistQueue;
+ this.index = index;
+ this.type = type;
+ this.veryLargeBulk = veryLargeBulk;
}
private static final int BYTES_IN_MB = 1024*1024;
@@ -124,15 +144,33 @@ public class ElasticsearchPersistWriter
public boolean isConnected() { return (client != null); }
@Override
- public void prepare(Object o) {
+ public void write(StreamsDatum streamsDatum) {
+
+ String json;
+ try {
+
+ json = mapper.writeValueAsString(streamsDatum.getDocument());
+
+ add(index, type, null, json);
+
+ } catch (JsonProcessingException e) {
+ LOGGER.warn("{} {}", e.getLocation(), e.getMessage());
+
+ }
+ }
+
+ @Override
+ public void start() {
+
manager = new ElasticsearchClientManager(config);
client = manager.getClient();
LOGGER.info(client.toString());
+
}
@Override
- public void cleanUp() {
+ public void stop() {
try {
flush();
@@ -143,19 +181,34 @@ public class ElasticsearchPersistWriter
}
@Override
- public void write(StreamsDatum streamsDatum) {
+ public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+ this.persistQueue = persistQueue;
+ }
- String json;
- try {
+ @Override
+ public Queue<StreamsDatum> getPersistQueue() {
+ return persistQueue;
+ }
- json = mapper.writeValueAsString(streamsDatum.getDocument());
- add(index, type, null, json);
+ @Override
+ public void run() {
- } catch (JsonProcessingException e) {
- LOGGER.warn("{} {}", e.getLocation(), e.getMessage());
+ start();
+
+ task = new Thread(new ElasticsearchPersistWriterTask(this));
+
+ task.start();
+ try {
+ task.join(60000);
+ } catch (InterruptedException e) {
+ stop();
+ return;
}
+ stop();
+ return;
+
}
@Override
@@ -526,5 +579,4 @@ public class ElasticsearchPersistWriter
return toReturn;
}
-
}
Modified: incubator/streams/trunk/streams-contrib/streams-provider-rss/streams-provider-rss.iml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-rss/streams-provider-rss.iml?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-rss/streams-provider-rss.iml (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-rss/streams-provider-rss.iml Mon Feb 24 16:33:34 2014
@@ -1,20 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>
-<module type="JAVA_MODULE" version="4">
+<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_6" inherit-compiler-output="false">
- <output url="file://$MAVEN_REPOSITORY$/org/apache/streams/streams-master/0.2-incubating-SNAPSHOT/target/classes" />
- <output-test url="file://$MAVEN_REPOSITORY$/org/apache/streams/streams-master/0.2-incubating-SNAPSHOT/target/test-classes" />
+ <output url="file://$MODULE_DIR$/target/classes" />
+ <output-test url="file://$MODULE_DIR$/target/test-classes" />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/target/generated-sources/jaxb2" isTestSource="false" generated="true" />
- <sourceFolder url="file://$MODULE_DIR$/target/generated-sources/jsonschema2pojo" isTestSource="false" generated="true" />
<sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/src/main/resources" type="java-resource" />
<sourceFolder url="file://$MODULE_DIR$/src/test/resources" type="java-test-resource" />
- <excludeFolder url="file://$MODULE_DIR$/target" />
+ <sourceFolder url="file://$MODULE_DIR$/target/generated-sources/jsonschema2pojo" isTestSource="false" generated="true" />
+ <excludeFolder url="file://$MODULE_DIR$/target/classes" />
+ <excludeFolder url="file://$MODULE_DIR$/target/maven-archiver" />
+ <excludeFolder url="file://$MODULE_DIR$/target/maven-shared-archive-resources" />
+ <excludeFolder url="file://$MODULE_DIR$/target/maven-status" />
+ <excludeFolder url="file://$MODULE_DIR$/target/surefire-reports" />
+ <excludeFolder url="file://$MODULE_DIR$/target/test-classes" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
- <orderEntry type="library" name="Maven: com.typesafe:config:1.2.0" level="project" />
+ <orderEntry type="library" name="Maven: com.typesafe:config:1.0.2" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-core:2.2.1" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.2.1" level="project" />
<orderEntry type="library" name="Maven: org.jsonschema2pojo:jsonschema2pojo-core:0.4.0" level="project" />
@@ -27,13 +32,13 @@
<orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-core-asl:1.9.11" level="project" />
<orderEntry type="library" name="Maven: com.google.code.gson:gson:2.2.4" level="project" />
<orderEntry type="library" name="Maven: com.google.code.findbugs:annotations:1.3.9" level="project" />
- <orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.6" level="project" />
+ <orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.6.1" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.11" level="project" />
<orderEntry type="library" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
- <orderEntry type="library" name="Maven: org.apache.streams:streams-core:0.1-SNAPSHOT" level="project" />
+ <orderEntry type="module" module-name="streams-core (2)" />
<orderEntry type="library" name="Maven: ch.qos.logback:logback-classic:1.0.9" level="project" />
<orderEntry type="library" name="Maven: ch.qos.logback:logback-core:1.0.9" level="project" />
- <orderEntry type="library" name="Maven: org.apache.streams:streams-pojo:0.1-SNAPSHOT" level="project" />
+ <orderEntry type="module" module-name="streams-pojo (2)" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.2.1" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.module:jackson-module-jaxb-annotations:2.2.1" level="project" />
<orderEntry type="library" name="Maven: org.codehaus.woodstox:stax2-api:3.1.1" level="project" />
@@ -49,8 +54,8 @@
<orderEntry type="library" name="Maven: com.sun.xml.fastinfoset:FastInfoset:1.2.12" level="project" />
<orderEntry type="library" name="Maven: javax.xml.bind:jsr173_api:1.0" level="project" />
<orderEntry type="library" name="Maven: commons-io:commons-io:2.4" level="project" />
- <orderEntry type="library" name="Maven: org.apache.streams:streams-config:0.1-SNAPSHOT" level="project" />
- <orderEntry type="library" name="Maven: com.google.guava:guava:16.0.1" level="project" />
+ <orderEntry type="module" module-name="streams-config (2)" />
+ <orderEntry type="library" name="Maven: com.google.guava:guava:15.0" level="project" />
<orderEntry type="library" name="Maven: com.google.collections:google-collections:1.0" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.2.1" level="project" />
<orderEntry type="library" name="Maven: com.jayway.jsonpath:json-path:0.9.0" level="project" />
Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventProcessor.java Mon Feb 24 16:33:34 2014
@@ -123,12 +123,9 @@ public class TwitterEventProcessor imple
LOGGER.debug("DELETE");
result = mapper.convertValue(event, Delete.class);
}
- } else if( outClass.equals( ObjectNode.class )) {
- LOGGER.debug("OBJECTNODE");
- result = mapper.convertValue(event, ObjectNode.class);
}
- // no supported conversion were applied
+ // no supported conversion were applied
if( result != null )
return result;
Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java Mon Feb 24 16:33:34 2014
@@ -33,7 +33,7 @@ import java.util.concurrent.*;
/**
* Created by sblackmon on 12/10/13.
*/
-public class TwitterStreamProvider implements StreamsProvider, Serializable {
+public class TwitterStreamProvider implements StreamsProvider, Serializable, Runnable {
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class);
@@ -77,44 +77,15 @@ public class TwitterStreamProvider imple
Config config = StreamsConfigurator.config.getConfig("twitter");
this.config = TwitterStreamConfigurator.detectConfiguration(config);
this.klass = klass;
- providerQueue = new LinkedBlockingQueue<StreamsDatum>();
}
public TwitterStreamProvider(TwitterStreamConfiguration config, Class klass) {
this.config = config;
this.klass = klass;
- providerQueue = new LinkedBlockingQueue<StreamsDatum>();
-
- }
-
- public void run() {
-
- for (int i = 0; i < 10; i++) {
- executor.submit(new TwitterEventProcessor(inQueue, providerQueue, klass));
- }
-
- new Thread(new TwitterStreamProviderTask(this)).start();
- }
-
- @Override
- public StreamsResultSet readCurrent() {
- run();
- StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
- return result;
- }
-
- @Override
- public StreamsResultSet readNew(BigInteger sequence) {
- return null;
}
@Override
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- return null;
- }
-
- @Override
- public void prepare(Object o) {
+ public void start() {
Preconditions.checkNotNull(this.klass);
@@ -150,12 +121,57 @@ public class TwitterStreamProvider imple
.authentication(auth)
.processor(new StringDelimitedProcessor(inQueue))
.build();
+
+ for (int i = 0; i < 10; i++) {
+ executor.submit(new TwitterEventProcessor(inQueue, providerQueue, klass));
+ }
+
+ new Thread(new TwitterStreamProviderTask(this)).start();
}
@Override
- public void cleanUp() {
+ public void stop() {
for (int i = 0; i < 10; i++) {
inQueue.add(TwitterEventProcessor.TERMINATE);
}
}
+
+ @Override
+ public Queue<StreamsDatum> getProviderQueue() {
+ return this.providerQueue;
+ }
+
+ public void setProviderQueue(Queue<StreamsDatum> providerQueue) {
+ this.providerQueue = providerQueue;
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return null;
+ }
+
+ @Override
+ public void run() {
+
+ start();
+
+ while( !executor.isTerminated()) {
+ try {
+ executor.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) { }
+ }
+
+ stop();
+ }
+
}
Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java Mon Feb 24 16:33:34 2014
@@ -14,7 +14,6 @@ import org.apache.streams.twitter.Twitte
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
import twitter4j.Twitter;
import twitter4j.TwitterFactory;
import twitter4j.conf.ConfigurationBuilder;
@@ -29,7 +28,7 @@ import java.util.concurrent.*;
/**
* Created by sblackmon on 12/10/13.
*/
-public class TwitterTimelineProvider implements StreamsProvider, Serializable {
+public class TwitterTimelineProvider implements StreamsProvider, Serializable, Runnable {
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class);
@@ -88,7 +87,8 @@ public class TwitterTimelineProvider imp
this.klass = klass;
}
- public void run() {
+ @Override
+ public void start() {
Preconditions.checkNotNull(providerQueue);
@@ -133,26 +133,51 @@ public class TwitterTimelineProvider imp
}
@Override
+ public void stop() {
+ for (int i = 0; i < 1; i++) {
+ inQueue.add(TwitterEventProcessor.TERMINATE);
+ }
+
+ shutdownAndAwaitTermination(executor);
+ }
+
+ @Override
+ public Queue<StreamsDatum> getProviderQueue() {
+ return this.providerQueue;
+ }
+
+ @Override
public StreamsResultSet readCurrent() {
- run();
- StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
- return result;
+ return null;
}
@Override
public StreamsResultSet readNew(BigInteger sequence) {
- throw new NotImplementedException();
+ return null;
}
@Override
public StreamsResultSet readRange(DateTime start, DateTime end) {
this.start = start;
this.end = end;
- run();
+ start();
StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
return result;
}
+ @Override
+ public void run() {
+
+ start();
+
+ while( !providerTaskComplete.isDone()) {
+ try {
+ Thread.sleep(new Random().nextInt(100));
+ } catch (InterruptedException e) { }
+ }
+
+ stop();
+ }
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
@@ -172,52 +197,4 @@ public class TwitterTimelineProvider imp
}
- @Override
- public void prepare(Object o) {
-
- Preconditions.checkNotNull(providerQueue);
-
- Preconditions.checkNotNull(this.klass);
-
- Preconditions.checkNotNull(config.getOauth().getConsumerKey());
- Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
- Preconditions.checkNotNull(config.getOauth().getAccessToken());
- Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
-
- Preconditions.checkNotNull(config.getFollow());
-
- Preconditions.checkArgument(config.getEndpoint().equals("statuses/user_timeline"));
-
- Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
- Boolean includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true);
-
- Iterator<Long> ids = config.getFollow().iterator();
- while( ids.hasNext() ) {
- Long id = ids.next();
-
- String baseUrl = config.getProtocol() + "://" + config.getHost() + ":" + config.getPort() + "/" + config.getVersion() + "/";
-
- ConfigurationBuilder builder = new ConfigurationBuilder()
- .setOAuthConsumerKey(config.getOauth().getConsumerKey())
- .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
- .setOAuthAccessToken(config.getOauth().getAccessToken())
- .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
- .setIncludeEntitiesEnabled(includeEntitiesEnabled)
- .setJSONStoreEnabled(jsonStoreEnabled)
- .setAsyncNumThreads(3)
- .setRestBaseURL(baseUrl);
-
- Twitter twitter = new TwitterFactory(builder.build()).getInstance();
- providerTaskComplete = executor.submit(new TwitterTimelineProviderTask(this, twitter, id));
- }
-
- for (int i = 0; i < 1; i++) {
- executor.submit(new TwitterEventProcessor(inQueue, providerQueue, klass));
- }
- }
-
- @Override
- public void cleanUp() {
- shutdownAndAwaitTermination(executor);
- }
}
Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonEventActivitySerializer.java Mon Feb 24 16:33:34 2014
@@ -107,6 +107,10 @@ public abstract class TwitterJsonEventAc
return provider;
}
+ public static List<Object> getLinks(ObjectNode event) {
+ return null;
+ }
+
public static String getUrls(ObjectNode event) {
return null;
}
Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java Mon Feb 24 16:33:34 2014
@@ -2,18 +2,15 @@ package org.apache.streams.twitter.seria
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
import org.apache.streams.data.ActivitySerializer;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.pojo.json.ActivityObject;
import org.apache.streams.pojo.json.Actor;
-import org.apache.streams.twitter.Url;
import org.apache.streams.twitter.pojo.Retweet;
import org.apache.streams.twitter.pojo.Tweet;
import org.apache.streams.twitter.pojo.User;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
@@ -47,8 +44,6 @@ public class TwitterJsonRetweetActivityS
activity.setProvider(buildProvider(event));
activity.setTitle("");
activity.setContent(retweet.getRetweetedStatus().getText());
- activity.setUrl(getUrls(event));
- activity.setLinks(getLinks(retweet));
addTwitterExtension(activity, event);
addLocationExtension(activity, retweet);
return activity;
@@ -73,14 +68,6 @@ public class TwitterJsonRetweetActivityS
return actObj;
}
- public static List<Object> getLinks(Retweet retweet) {
- List<Object> links = Lists.newArrayList();
- for( Url url : retweet.getRetweetedStatus().getEntities().getUrls() ) {
- links.add(url.getExpandedUrl());
- }
- return links;
- }
-
public static void addLocationExtension(Activity activity, Retweet retweet) {
Map<String, Object> extensions = ensureExtensions(activity);
Map<String, Object> location = new HashMap<String, Object>();
Modified: incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java (original)
+++ incubator/streams/trunk/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java Mon Feb 24 16:33:34 2014
@@ -2,17 +2,14 @@ package org.apache.streams.twitter.seria
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
import org.apache.streams.data.ActivitySerializer;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.pojo.json.ActivityObject;
import org.apache.streams.pojo.json.Actor;
-import org.apache.streams.twitter.Url;
import org.apache.streams.twitter.pojo.Tweet;
import org.apache.streams.twitter.pojo.User;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
@@ -48,7 +45,7 @@ public class TwitterJsonTweetActivitySer
activity.setTitle("");
activity.setContent(tweet.getText());
activity.setUrl(getUrls(event));
- activity.setLinks(getLinks(tweet));
+ activity.setLinks(getLinks(event));
addTwitterExtension(activity, event);
addLocationExtension(activity, tweet);
return activity;
@@ -73,14 +70,6 @@ public class TwitterJsonTweetActivitySer
return actObj;
}
- public static List<Object> getLinks(Tweet tweet) {
- List<Object> links = Lists.newArrayList();
- for( Url url : tweet.getEntities().getUrls() ) {
- links.add(url.getExpandedUrl());
- }
- return links;
- }
-
public static ActivityObject buildTarget(Tweet tweet) {
return null;
}
Modified: incubator/streams/trunk/streams-core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-core/pom.xml?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-core/pom.xml (original)
+++ incubator/streams/trunk/streams-core/pom.xml Mon Feb 24 16:33:34 2014
@@ -26,6 +26,7 @@
<version>0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
+
<artifactId>streams-core</artifactId>
<packaging>jar</packaging>
@@ -34,11 +35,6 @@
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.streams</groupId>
- <artifactId>streams-util</artifactId>
- <version>${project.version}</version>
- </dependency>
</dependencies>
<build>
@@ -55,4 +51,4 @@
</testResource>
</testResources>
</build>
-</project>
+</project>
\ No newline at end of file
Modified: incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java (original)
+++ incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java Mon Feb 24 16:33:34 2014
@@ -95,21 +95,4 @@ public class StreamsDatum implements Ser
public void setDocument(Object document) {
this.document = document;
}
-
- @Override
- public boolean equals(Object o) {
- if(o instanceof StreamsDatum) {
- StreamsDatum that = (StreamsDatum) o;
- if(this.document != null && this.document.equals(that.document)) {
- return (this.timestamp != null ? this.timestamp.equals(that.timestamp) : that.timestamp == null) &&
- (this.sequenceid != null ? this.sequenceid.equals(that.sequenceid) : that.sequenceid == null);
- }
- else {
- return that.document == null && this.document == null;
- }
- }
- else {
- return false;
- }
- }
}
Modified: incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java (original)
+++ incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java Mon Feb 24 16:33:34 2014
@@ -25,22 +25,18 @@ import java.math.BigInteger;
import java.util.Queue;
/**
- *
- * Currently a duplicate interface. Has exact same methods as StreamsProvider.
- * Future work should make this interface necessary I'm told.
- *
+ * Created by sblackmon on 12/13/13.
*/
-public interface StreamsPersistReader extends StreamsProvider {
+public interface StreamsPersistReader {
-// void start();
-// void stop();
-//
-// public void setPersistQueue(Queue<StreamsDatum> persistQueue);
-// public Queue<StreamsDatum> getPersistQueue();
+ void start();
+ void stop();
-// public StreamsResultSet readAll();
-// public StreamsResultSet readNew(BigInteger sequence);
-// public StreamsResultSet readRange(DateTime start, DateTime end);
+ public void setPersistQueue(Queue<StreamsDatum> persistQueue);
+ public Queue<StreamsDatum> getPersistQueue();
+ public StreamsResultSet readAll();
+ public StreamsResultSet readNew(BigInteger sequence);
+ public StreamsResultSet readRange(DateTime start, DateTime end);
}
Modified: incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java (original)
+++ incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java Mon Feb 24 16:33:34 2014
@@ -24,12 +24,14 @@ import java.util.Queue;
/**
* Created by sblackmon on 12/13/13.
*/
-public interface StreamsPersistWriter extends StreamsOperation{
+public interface StreamsPersistWriter {
+
+ void start();
+ void stop();
+
+ public void setPersistQueue(Queue<StreamsDatum> persistQueue);
+ public Queue<StreamsDatum> getPersistQueue();
- /**
- * Persist the StreamsDatum to the corresponding data store.
- * @param entry to be stored.
- */
public void write( StreamsDatum entry );
}
Modified: incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java (original)
+++ incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java Mon Feb 24 16:33:34 2014
@@ -24,16 +24,17 @@ import java.util.Queue;
/**
* Created by sblackmon on 12/13/13.
*/
-public interface StreamsProcessor extends StreamsOperation{
+public interface StreamsProcessor {
+ void start();
+ void stop();
+ public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue);
+ public Queue<StreamsDatum> getProcessorInputQueue();
+
+ public void setProcessorOutputQueue(Queue<StreamsDatum> outputQueue);
+ public Queue<StreamsDatum> getProcessorOutputQueue();
- /**
- * Process/Analyze the {@link org.apache.streams.core.StreamsDatum} and return the the StreamsDatums that will
- * passed to every down stream operation that reads from this processor.
- * @param entry StreamsDatum to be process
- * @return resulting StreamDatums from process. Should never be null or contain null object. Empty list OK.
- */
public List<StreamsDatum> process( StreamsDatum entry );
}
Modified: incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java?rev=1571339&r1=1571338&r2=1571339&view=diff
==============================================================================
--- incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java (original)
+++ incubator/streams/trunk/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java Mon Feb 24 16:33:34 2014
@@ -26,12 +26,12 @@ import java.util.Queue;
/**
* Created by sblackmon on 12/13/13.
*/
-public interface StreamsProvider extends StreamsOperation {
+public interface StreamsProvider {
-// void start();
-// void stop();
-//
-// public Queue<StreamsDatum> getProviderQueue();
+ void start();
+ void stop();
+
+ public Queue<StreamsDatum> getProviderQueue();
public StreamsResultSet readCurrent();
public StreamsResultSet readNew(BigInteger sequence);