You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sm...@apache.org on 2017/01/04 04:01:30 UTC
incubator-streams git commit: Replace Guava APIs with Java 8,
this closes apache/incubator-streams#347
Repository: incubator-streams
Updated Branches:
refs/heads/master e40e6287e -> 7810361d2
Replace Guava APIs with Java 8, this closes apache/incubator-streams#347
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7810361d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7810361d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7810361d
Branch: refs/heads/master
Commit: 7810361d2e5a2f236967b3208053ec80b0e5885a
Parents: e40e628
Author: smarthi <sm...@apache.org>
Authored: Tue Jan 3 23:00:20 2017 -0500
Committer: smarthi <sm...@apache.org>
Committed: Tue Jan 3 23:00:20 2017 -0500
----------------------------------------------------------------------
.../filters/test/VerbDefinitionFilterTest.java | 23 ++++++++++----------
.../amazon/kinesis/KinesisPersistReader.java | 7 +++---
.../amazon/kinesis/KinesisPersistWriter.java | 4 ++--
.../org/apache/streams/s3/S3PersistReader.java | 7 +++---
.../apache/streams/s3/S3PersistReaderTask.java | 4 ++--
.../test/TestMetadataFromDocumentProcessor.java | 4 ++--
.../streams/hdfs/WebHdfsPersistReader.java | 4 ++--
.../streams/hdfs/WebHdfsPersistReaderTask.java | 6 ++---
.../streams/hdfs/WebHdfsPersistWriter.java | 4 ++--
.../regex/RegexHashtagExtractorTest.java | 17 +++++++++------
.../regex/RegexMentionExtractorTest.java | 23 +++++++++++---------
.../streams/regex/RegexUrlExtractorTest.java | 21 ++++++++++--------
.../provider/FacebookDataCollector.java | 4 ++--
.../provider/FacebookFriendFeedProvider.java | 7 +++---
.../provider/FacebookFriendUpdatesProvider.java | 14 ++++++------
.../facebook/provider/FacebookProvider.java | 10 ++++-----
.../provider/FacebookUserstreamProvider.java | 21 +++++++++---------
.../streams/moreover/MoreoverProvider.java | 3 +--
.../streams/moreover/MoreoverProviderTask.java | 4 +---
.../sysomos/provider/SysomosProvider.java | 15 +++++--------
.../streams/sysomos/util/SysomosUtils.java | 4 ++--
.../test/providers/EmptyResultSetProvider.java | 9 ++++----
.../test/providers/NumericMessageProvider.java | 7 +++---
.../test/component/FileReaderProvider.java | 4 ++--
24 files changed, 114 insertions(+), 112 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-components/streams-filters/src/test/java/org/apache/streams/filters/test/VerbDefinitionFilterTest.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-filters/src/test/java/org/apache/streams/filters/test/VerbDefinitionFilterTest.java b/streams-components/streams-filters/src/test/java/org/apache/streams/filters/test/VerbDefinitionFilterTest.java
index 09cb2e6..48b9204 100644
--- a/streams-components/streams-filters/src/test/java/org/apache/streams/filters/test/VerbDefinitionFilterTest.java
+++ b/streams-components/streams-filters/src/test/java/org/apache/streams/filters/test/VerbDefinitionFilterTest.java
@@ -26,10 +26,11 @@ import org.apache.streams.pojo.json.Activity;
import org.apache.streams.verbs.VerbDefinition;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Sets;
import org.junit.Test;
import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Tests for {$link: org.apache.streams.verbs.VerbDefinitionResolver}
@@ -44,7 +45,7 @@ public class VerbDefinitionFilterTest {
@Test
public void testVerbMatchFilter() throws Exception {
VerbDefinition definition = mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/post.json"), VerbDefinition.class);
- VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Sets.newHashSet(definition));
+ VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Stream.of(definition).collect(Collectors.toSet()));
filter.prepare(null);
StreamsDatum datum1 = new StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"notpost\"}\n", Activity.class));
List<StreamsDatum> result1 = filter.process(datum1);
@@ -60,7 +61,7 @@ public class VerbDefinitionFilterTest {
@Test
public void testProviderFilter() throws Exception {
VerbDefinition definition = mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/provider.json"), VerbDefinition.class);
- VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Sets.newHashSet(definition));
+ VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Stream.of(definition).collect(Collectors.toSet()));
filter.prepare(null);
StreamsDatum datum1 = new StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"post\",\"provider\":{\"id\":\"providerId\",\"objectType\":\"product\"}}\n", Activity.class));
List<StreamsDatum> result1 = filter.process(datum1);
@@ -76,7 +77,7 @@ public class VerbDefinitionFilterTest {
@Test
public void testActorFilter() throws Exception {
VerbDefinition definition = mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/actor.json"), VerbDefinition.class);
- VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Sets.newHashSet(definition));
+ VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Stream.of(definition).collect(Collectors.toSet()));
filter.prepare(null);
StreamsDatum datum1 = new StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"post\",\"actor\":{\"id\":\"actorId\",\"objectType\":\"page\"}}\n", Activity.class));
List<StreamsDatum> result1 = filter.process(datum1);
@@ -93,7 +94,7 @@ public class VerbDefinitionFilterTest {
@Test
public void testObjectFilter() throws Exception {
VerbDefinition definition = mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/object.json"), VerbDefinition.class);
- VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Sets.newHashSet(definition));
+ VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Stream.of(definition).collect(Collectors.toSet()));
filter.prepare(null);
StreamsDatum datum1 = new StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"post\",\"object\":{\"id\":\"objectId\"}}\n", Activity.class));
List<StreamsDatum> result1 = filter.process(datum1);
@@ -109,7 +110,7 @@ public class VerbDefinitionFilterTest {
@Test
public void testMultiFilter() throws Exception {
VerbDefinition definition = mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/follow.json"), VerbDefinition.class);
- VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Sets.newHashSet(definition));
+ VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Stream.of(definition).collect(Collectors.toSet()));
filter.prepare(null);
StreamsDatum datum1 = new StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"follow\",\"actor\":{\"id\":\"actorId\",\"objectType\":\"page\"}}}\n", Activity.class));
List<StreamsDatum> result1 = filter.process(datum1);
@@ -129,7 +130,7 @@ public class VerbDefinitionFilterTest {
@Test
public void testTargetRequired() throws Exception {
VerbDefinition definition = mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/targetrequired.json"), VerbDefinition.class);
- VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Sets.newHashSet(definition));
+ VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Stream.of(definition).collect(Collectors.toSet()));
filter.prepare(null);
StreamsDatum datum1 = new StreamsDatum(mapper.readValue("{\"id\":\"id\",\"verb\":\"post\",\"object\":{\"id\":\"objectId\",\"objectType\":\"task\"}}\n", Activity.class));
List<StreamsDatum> result1 = filter.process(datum1);
@@ -145,7 +146,7 @@ public class VerbDefinitionFilterTest {
@Test
public void testAllWildcard() throws Exception {
VerbDefinition definition = mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/post.json"), VerbDefinition.class);
- VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Sets.newHashSet(definition));
+ VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Stream.of(definition).collect(Collectors.toSet()));
filter.prepare(null);
StreamsDatum datum1 = new StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"notpost\"}\n", Activity.class));
List<StreamsDatum> result1 = filter.process(datum1);
@@ -180,7 +181,7 @@ public class VerbDefinitionFilterTest {
VerbDefinition object = mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/object.json"), VerbDefinition.class);
VerbDefinition target = mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/targetrequired.json"), VerbDefinition.class);
VerbDefinition follow = mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/follow.json"), VerbDefinition.class);
- VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Sets.newHashSet(provider,actor,object,target,follow));
+ VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Stream.of(provider,actor,object,target,follow).collect(Collectors.toSet()));
filter.prepare(null);
StreamsDatum datum1 = new StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"notpost\"}\n", Activity.class));
List<StreamsDatum> result1 = filter.process(datum1);
@@ -214,7 +215,7 @@ public class VerbDefinitionFilterTest {
@Test
public void testVerbDropFilter() throws Exception {
VerbDefinition definition = mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/post.json"), VerbDefinition.class);
- VerbDefinitionDropFilter filter = new VerbDefinitionDropFilter(Sets.newHashSet(definition));
+ VerbDefinitionDropFilter filter = new VerbDefinitionDropFilter(Stream.of(definition).collect(Collectors.toSet()));
filter.prepare(null);
StreamsDatum datum1 = new StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"notpost\"}\n", Activity.class));
List<StreamsDatum> result1 = filter.process(datum1);
@@ -230,7 +231,7 @@ public class VerbDefinitionFilterTest {
@Test
public void testDropAllWildcard() throws Exception {
VerbDefinition definition = mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/post.json"), VerbDefinition.class);
- VerbDefinitionDropFilter filter = new VerbDefinitionDropFilter(Sets.newHashSet(definition));
+ VerbDefinitionDropFilter filter = new VerbDefinitionDropFilter(Stream.of(definition).collect(Collectors.toSet()));
filter.prepare(null);
StreamsDatum datum1 = new StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"notpost\"}\n", Activity.class));
List<StreamsDatum> result1 = filter.process(datum1);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
index 42cce8a..5b18a91 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
@@ -34,9 +34,8 @@ import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.Shard;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Strings;
-import com.google.common.collect.Queues;
import com.typesafe.config.Config;
+import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -130,7 +129,7 @@ public class KinesisPersistReader implements StreamsPersistReader, Serializable
StreamsResultSet current;
synchronized( KinesisPersistReader.class ) {
- current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+ current = new StreamsResultSet(new ConcurrentLinkedQueue<>(persistQueue));
persistQueue.clear();
}
return current;
@@ -162,7 +161,7 @@ public class KinesisPersistReader implements StreamsPersistReader, Serializable
clientConfig.setProtocol(Protocol.valueOf(config.getProtocol().toString()));
this.client = new AmazonKinesisClient(credentials, clientConfig);
- if (!Strings.isNullOrEmpty(config.getRegion()))
+ if (StringUtils.isNotEmpty(config.getRegion()))
this.client.setRegion(Region.getRegion(Regions.fromName(config.getRegion())));
}
streamNames = this.config.getStreams();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
index 6e2db0f..d528f15 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
@@ -35,8 +35,8 @@ import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.PutRecordResult;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Strings;
import com.typesafe.config.Config;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -124,7 +124,7 @@ public class KinesisPersistWriter implements StreamsPersistWriter {
clientConfig.setProtocol(Protocol.valueOf(config.getProtocol().toString()));
this.client = new AmazonKinesisClient(credentials, clientConfig);
- if (!Strings.isNullOrEmpty(config.getRegion())) {
+ if (StringUtils.isNotEmpty(config.getRegion())) {
this.client.setRegion(Region.getRegion(Regions.fromName(config.getRegion())));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index da1a00e..3a2cf3b 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -37,8 +37,8 @@ import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Strings;
import com.google.common.collect.Queues;
+import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +47,7 @@ import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -132,7 +133,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
clientOptions.setPathStyleAccess(false);
this.amazonS3Client = new AmazonS3Client(credentials, clientConfig);
- if ( !Strings.isNullOrEmpty(s3ReaderConfiguration.getRegion())) {
+ if (StringUtils.isNotEmpty(s3ReaderConfiguration.getRegion())) {
this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3ReaderConfiguration.getRegion())));
}
this.amazonS3Client.setS3ClientOptions(clientOptions);
@@ -206,7 +207,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
StreamsResultSet current;
synchronized ( S3PersistReader.class ) {
- current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+ current = new StreamsResultSet(new ConcurrentLinkedQueue<>(persistQueue));
current.setCounter(new DatumStatusCounter());
current.getCounter().add(countersCurrent);
countersTotal.add(countersCurrent);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
index 82bcba7..775e7e0 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
@@ -22,7 +22,7 @@ import org.apache.streams.core.DatumStatus;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.util.ComponentUtils;
-import com.google.common.base.Strings;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +57,7 @@ public class S3PersistReaderTask implements Runnable {
String line;
try {
while ((line = bufferedReader.readLine()) != null) {
- if ( !Strings.isNullOrEmpty(line) ) {
+ if (StringUtils.isNotEmpty(line) ) {
reader.countersCurrent.incrementAttempt();
StreamsDatum entry = reader.lineReaderUtil.processLine(line);
ComponentUtils.offerUntilSuccess(entry, reader.persistQueue);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
index 8b45eb2..a0b483f 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
@@ -25,7 +25,6 @@ import org.apache.streams.pojo.json.Activity;
import org.apache.streams.pojo.json.ActivityObject;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Sets;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Before;
@@ -35,6 +34,7 @@ import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -74,7 +74,7 @@ public class TestMetadataFromDocumentProcessor {
.getResourceAsStream("activities");
List<String> files = IOUtils.readLines(testActivityFolderStream, StandardCharsets.UTF_8);
- Set<ActivityObject> objects = Sets.newHashSet();
+ Set<ActivityObject> objects = new HashSet<>();
for( String file : files) {
LOGGER.info("File: " + file );
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index 0036fea..a673d8f 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -53,6 +53,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -217,7 +218,6 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
}
streamsConfiguration = StreamsConfigurator.detectConfiguration();
persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(streamsConfiguration.getBatchSize().intValue()));
- //persistQueue = Queues.synchronizedQueue(new ConcurrentLinkedQueue());
executor = Executors.newSingleThreadExecutor();
mapper = StreamsJacksonMapper.getInstance();
}
@@ -252,7 +252,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
StreamsResultSet current;
synchronized ( WebHdfsPersistReader.class ) {
- current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+ current = new StreamsResultSet(new ConcurrentLinkedQueue<>(persistQueue));
current.setCounter(new DatumStatusCounter());
current.getCounter().add(countersCurrent);
countersTotal.add(countersCurrent);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index d18bda9..fa287eb 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -21,8 +21,8 @@ package org.apache.streams.hdfs;
import org.apache.streams.core.DatumStatus;
import org.apache.streams.core.StreamsDatum;
-import com.google.common.base.Strings;
import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,7 +82,7 @@ public class WebHdfsPersistReaderTask implements Runnable {
do {
try {
line = bufferedReader.readLine();
- if ( !Strings.isNullOrEmpty(line) ) {
+ if (StringUtils.isNotEmpty(line)) {
reader.countersCurrent.incrementAttempt();
StreamsDatum entry = reader.lineReaderUtil.processLine(line);
if ( entry != null ) {
@@ -98,7 +98,7 @@ public class WebHdfsPersistReaderTask implements Runnable {
reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
}
}
- while ( !Strings.isNullOrEmpty(line) );
+ while (StringUtils.isNotEmpty(line));
LOGGER.info("Finished Processing " + fileStatus.getPath().getName());
try {
bufferedReader.close();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 29a6b73..9079c7e 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -29,7 +29,7 @@ import org.apache.streams.core.StreamsPersistWriter;
import org.apache.streams.jackson.StreamsJacksonMapper;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Strings;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
@@ -101,7 +101,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
StringBuilder uriBuilder = new StringBuilder();
uriBuilder.append(hdfsConfiguration.getScheme());
uriBuilder.append("://");
- if ( !Strings.isNullOrEmpty(hdfsConfiguration.getHost())) {
+ if (StringUtils.isNotEmpty(hdfsConfiguration.getHost())) {
uriBuilder.append(hdfsConfiguration.getHost() + ":" + hdfsConfiguration.getPort());
} else {
uriBuilder.append("/");
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java
index 1912ff0..b17df37 100644
--- a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java
+++ b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java
@@ -23,15 +23,17 @@ import org.apache.streams.core.StreamsDatum;
import org.apache.streams.pojo.extensions.ExtensionUtil;
import org.apache.streams.pojo.json.Activity;
-import com.google.common.collect.Sets;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
@@ -52,11 +54,12 @@ public class RegexHashtagExtractorTest {
@Parameterized.Parameters
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
- {"This is the #content of a standard tweet", Sets.newHashSet("content")},
- {"This is the content of a standard tweet", Sets.newHashSet()},
- {"This is the #content of a standard #tweet", Sets.newHashSet("content", "tweet")},
- {"UNIX \u65f6\u95f41400000000 \u79d2\u2026\u2026 \uff08\u8be5\u7761\u89c9\u4e86\uff0c\u5404\u4f4d\u591c\u732b\u5b50\uff09#\u7a0b\u5e8f\u5458#", Sets.newHashSet("\u7a0b\u5e8f\u5458")},
- {"This is the body of a #fbpost. It can have multiple lines of #content, as well as much more detailed and flowery #language.", Sets.newHashSet("content", "fbpost", "language")}
+ {"This is the #content of a standard tweet", Stream.of("content").collect(Collectors.toSet())},
+ {"This is the content of a standard tweet", new HashSet<>()},
+ {"This is the #content of a standard #tweet", Stream.of("content", "tweet").collect(Collectors.toSet())},
+ {"UNIX \u65f6\u95f41400000000 \u79d2\u2026\u2026 \uff08\u8be5\u7761\u89c9\u4e86\uff0c\u5404\u4f4d\u591c\u732b\u5b50\uff09#\u7a0b\u5e8f\u5458#", Stream.of("\u7a0b\u5e8f\u5458").collect(Collectors.toSet())},
+ {"This is the body of a #fbpost. It can have multiple lines of #content, as well as much more detailed and flowery #language.",
+ Stream.of("content", "fbpost", "language").collect(Collectors.toSet())}
});
}
@@ -68,7 +71,7 @@ public class RegexHashtagExtractorTest {
assertThat(result.size(), is(equalTo(1)));
Activity output = (Activity)result.get(0).getDocument();
Set<String> extracted = (Set) ExtensionUtil.getInstance().ensureExtensions(output).get(RegexHashtagExtractor.EXTENSION_KEY);
- Sets.SetView<String> diff = Sets.difference(extracted, hashtags);
+ Set<String> diff = extracted.stream().filter((x) -> !hashtags.contains(x)).collect(Collectors.toSet());
assertThat(diff.size(), is(equalTo(0)));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java
index bb0e95d..3706b15 100644
--- a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java
+++ b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java
@@ -34,6 +34,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
@@ -54,23 +56,24 @@ public class RegexMentionExtractorTest {
@Parameterized.Parameters
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
- {"This is the @content of a standard tweet", Sets.newHashSet(new HashMap<String, Object>() {{
- put("displayName", "content");
- }})},
- {"This is the content of a standard tweet", Sets.newHashSet(new HashMap<String, Object>())},
- {"This is the @content of a standard @tweet", Sets.newHashSet(new HashMap<String, Object>() {{
+ {"This is the @content of a standard tweet", Stream.of(new HashMap<String, Object>() {{
+ put("displayName", "content");
+ }}).collect(Collectors.toSet())},
+ {"This is the content of a standard tweet", Stream.of(new HashMap<String, Object>()).collect(Collectors.toSet())},
+ {"This is the @content of a standard @tweet", Stream.of(new HashMap<String, Object>() {{
put("displayName", "content");
}},new HashMap<String, Object>() {{
put("displayName", "tweet");
- }})},
- {"UNIX \u65f6\u95f41400000000 \u79d2\u2026\u2026 \uff08\u8be5\u7761\u89c9\u4e86\uff0c\u5404\u4f4d\u591c\u732b\u5b50\uff09@\u7a0b\u5e8f\u5458#", Sets.newHashSet(new HashMap<String, Object>() {{
+ }}).collect(Collectors.toSet())},
+ {"UNIX \u65f6\u95f41400000000 \u79d2\u2026\u2026 \uff08\u8be5\u7761\u89c9\u4e86\uff0c\u5404\u4f4d\u591c\u732b\u5b50\uff09@\u7a0b\u5e8f\u5458#", Stream.of(new HashMap<String, Object>() {{
put("displayName", "\u7a0b\u5e8f\u5458");
- }})},
- {"This is the body of a @fbpost. It can have multiple lines of #content, as well as much more detailed and flowery @language.", Sets.newHashSet(new HashMap<String, Object>() {{
+ }}).collect(Collectors.toSet())},
+ {"This is the body of a @fbpost. It can have multiple lines of #content, as well as much more detailed and flowery @language.",
+ Stream.of(new HashMap<String, Object>() {{
put("displayName", "fbpost");
}},new HashMap<String, Object>() {{
put("displayName", "language");
- }})}
+ }}).collect(Collectors.toSet())}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java
index 64e8599..7c7948b 100644
--- a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java
+++ b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java
@@ -22,15 +22,17 @@ package org.apache.streams.regex;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.pojo.json.Activity;
-import com.google.common.collect.Sets;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
@@ -51,12 +53,13 @@ public class RegexUrlExtractorTest {
@Parameterized.Parameters
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
- {"This is the http://t.co/foo of a standard tweet", Sets.newHashSet("http://t.co/foo")},
- {"This is the https://t.co/foo of a standard tweet", Sets.newHashSet("https://t.co/foo")},
- {"This is the https://t.co/foo of a standard tweet https://t.co/foo", Sets.newHashSet("https://t.co/foo")},
- {"This is the http://amd.com/test of a standard tweet", Sets.newHashSet("http://amd.com/test")},
- {"This is the content of a standard tweet", Sets.newHashSet()},
- {"This is the http://www.google.com/articles/awesome?with=query¶ms=true of a standard @tweet", Sets.newHashSet("http://www.google.com/articles/awesome?with=query¶ms=true")}
+ {"This is the http://t.co/foo of a standard tweet", Stream.of("http://t.co/foo").collect(Collectors.toSet())},
+ {"This is the https://t.co/foo of a standard tweet", Stream.of("https://t.co/foo").collect(Collectors.toSet())},
+ {"This is the https://t.co/foo of a standard tweet https://t.co/foo", Stream.of("https://t.co/foo").collect(Collectors.toSet())},
+ {"This is the http://amd.com/test of a standard tweet", Stream.of("http://amd.com/test").collect(Collectors.toSet())},
+ {"This is the content of a standard tweet", new HashSet<>()},
+ {"This is the http://www.google.com/articles/awesome?with=query¶ms=true of a standard @tweet",
+ Stream.of("http://www.google.com/articles/awesome?with=query¶ms=true").collect(Collectors.toSet())}
});
}
@@ -67,8 +70,8 @@ public class RegexUrlExtractorTest {
List<StreamsDatum> result = new RegexUrlExtractor().process(datum);
assertThat(result.size(), is(equalTo(1)));
Activity output = (Activity)result.get(0).getDocument();
- Set<String> extracted = Sets.newHashSet(output.getLinks());
- Sets.SetView<String> diff = Sets.difference(links, extracted);
+ Set<String> extracted = new HashSet<>(output.getLinks());
+ Set<String> diff = links.stream().filter((x) -> !extracted.contains(x)).collect(Collectors.toSet());
assertThat(diff.size(), is(equalTo(0)));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
index 52ec222..0a13b64 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
@@ -27,10 +27,10 @@ import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManager;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
import facebook4j.Facebook;
import facebook4j.FacebookFactory;
import facebook4j.conf.ConfigurationBuilder;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,7 +112,7 @@ public abstract class FacebookDataCollector implements Runnable {
LOGGER.debug("appAccessToken : {}", this.config.getOauth().getAppAccessToken());
}
cb.setJSONStoreEnabled(true);
- if (!Strings.isNullOrEmpty(config.getVersion())) {
+ if (StringUtils.isNotEmpty(config.getVersion())) {
cb.setRestBaseURL("https://graph.facebook.com/" + config.getVersion() + "/");
}
LOGGER.debug("appId : {}", this.config.getOauth().getAppId());
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
index 14d5a64..e256418 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
@@ -29,8 +29,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.util.ComponentUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Queues;
-import com.google.common.util.concurrent.MoreExecutors;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigRenderOptions;
import facebook4j.Facebook;
@@ -54,6 +52,7 @@ import java.util.Iterator;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -166,7 +165,7 @@ public class FacebookFriendFeedProvider implements StreamsProvider, Serializable
StreamsResultSet current;
synchronized (FacebookUserstreamProvider.class) {
- current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+ current = new StreamsResultSet(new ConcurrentLinkedQueue<>(providerQueue));
current.setCounter(new DatumStatusCounter());
current.getCounter().add(countersCurrent);
countersTotal.add(countersCurrent);
@@ -220,7 +219,7 @@ public class FacebookFriendFeedProvider implements StreamsProvider, Serializable
@Override
public void prepare(Object configurationObject) {
- executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+ executor = newFixedThreadPoolWithQueueSize(5, 20);
Objects.requireNonNull(providerQueue);
Objects.requireNonNull(this.klass);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
index c973863..39b7771 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
@@ -30,8 +30,6 @@ import org.apache.streams.util.ComponentUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.MoreExecutors;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigRenderOptions;
import facebook4j.Facebook;
@@ -51,6 +49,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigInteger;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.Queue;
@@ -63,6 +62,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
/**
* FacebookFriendUpdatesProvider provides updates from friend feed.
@@ -237,7 +237,7 @@ public class FacebookFriendUpdatesProvider implements StreamsProvider, Serializa
@Override
public void prepare(Object configurationObject) {
- executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+ executor = newFixedThreadPoolWithQueueSize(5, 20);
Objects.requireNonNull(providerQueue);
Objects.requireNonNull(this.klass);
@@ -289,7 +289,7 @@ public class FacebookFriendUpdatesProvider implements StreamsProvider, Serializa
FacebookUserstreamProvider provider;
Facebook client;
- private Set<Post> priorPollResult = Sets.newHashSet();
+ private Set<Post> priorPollResult = new HashSet<>();
public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) {
provider = facebookUserstreamProvider;
@@ -301,9 +301,9 @@ public class FacebookFriendUpdatesProvider implements StreamsProvider, Serializa
while (provider.isRunning()) {
try {
ResponseList<Post> postResponseList = client.getHome();
- Set<Post> update = Sets.newHashSet(postResponseList);
- Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update));
- Set<Post> entrySet = Sets.difference(update, repeats);
+ Set<Post> update = new HashSet<>(postResponseList);
+ Set<Post> repeats = priorPollResult.stream().filter(update::contains).collect(Collectors.toSet());
+ Set<Post> entrySet = update.stream().filter((x) -> !repeats.contains(x)).collect(Collectors.toSet());
for (Post item : entrySet) {
String json = DataObjectFactory.getRawJSON(item);
org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
index a665023..70e0a65 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
@@ -29,8 +29,6 @@ import org.apache.streams.util.ComponentUtils;
import org.apache.streams.util.SerializationUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -43,11 +41,13 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -105,7 +105,7 @@ public abstract class FacebookProvider implements StreamsProvider {
@Override
public StreamsResultSet readCurrent() {
int batchSize = 0;
- BlockingQueue<StreamsDatum> batch = Queues.newLinkedBlockingQueue();
+ BlockingQueue<StreamsDatum> batch = new LinkedBlockingQueue<>();
while (!this.datums.isEmpty() && batchSize < MAX_BATCH_SIZE) {
ComponentUtils.offerUntilSuccess(ComponentUtils.pollWhileNotEmpty(this.datums), batch);
++batchSize;
@@ -125,7 +125,7 @@ public abstract class FacebookProvider implements StreamsProvider {
@Override
public void prepare(Object configurationObject) {
- this.datums = Queues.newLinkedBlockingQueue();
+ this.datums = new LinkedBlockingQueue<>();
this.isComplete = new AtomicBoolean(false);
this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
}
@@ -141,7 +141,7 @@ public abstract class FacebookProvider implements StreamsProvider {
* @param idsToAfterDate idsToAfterDate
*/
public void overrideIds(Map<String, DateTime> idsToAfterDate) {
- Set<IdConfig> ids = Sets.newHashSet();
+ Set<IdConfig> ids = new HashSet<>();
for (String id : idsToAfterDate.keySet()) {
IdConfig idConfig = new IdConfig();
idConfig.setId(id);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java
index d198e43..c502ada 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java
@@ -29,10 +29,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.util.ComponentUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigRenderOptions;
import facebook4j.Facebook;
@@ -51,11 +47,13 @@ import java.io.IOException;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -63,6 +61,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
public class FacebookUserstreamProvider implements StreamsProvider, Serializable {
@@ -88,7 +87,7 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable
this.configuration = config;
}
- protected ListeningExecutorService executor;
+ protected ExecutorService executor;
protected DateTime start;
protected DateTime end;
@@ -187,7 +186,7 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable
StreamsResultSet current;
synchronized (FacebookUserstreamProvider.class) {
- current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+ current = new StreamsResultSet(new ConcurrentLinkedQueue<>(providerQueue));
current.setCounter(new DatumStatusCounter());
current.getCounter().add(countersCurrent);
countersTotal.add(countersCurrent);
@@ -241,7 +240,7 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable
@Override
public void prepare(Object configurationObject) {
- executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+ executor = newFixedThreadPoolWithQueueSize(5, 20);
Objects.requireNonNull(providerQueue);
Objects.requireNonNull(this.klass);
@@ -299,7 +298,7 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable
Facebook client;
String id;
- private Set<Post> priorPollResult = Sets.newHashSet();
+ private Set<Post> priorPollResult = new HashSet<>();
public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) {
this.provider = facebookUserstreamProvider;
@@ -318,9 +317,9 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable
try {
postResponseList = client.getFeed(id);
- Set<Post> update = Sets.newHashSet(postResponseList);
- Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update));
- Set<Post> entrySet = Sets.difference(update, repeats);
+ Set<Post> update = new HashSet<>(postResponseList);
+ Set<Post> repeats = priorPollResult.stream().filter(update::contains).collect(Collectors.toSet());
+ Set<Post> entrySet = update.stream().filter((x) -> !repeats.contains(x)).collect(Collectors.toSet());
LOGGER.debug(this.id + " response: " + update.size() + " previous: " + repeats.size() + " new: " + entrySet.size());
for (Post item : entrySet) {
String json = DataObjectFactory.getRawJSON(item);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java
index f7b9d88..a295898 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java
@@ -30,7 +30,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
-import com.google.common.collect.Queues;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -106,7 +105,7 @@ public class MoreoverProvider implements StreamsProvider {
Collection<StreamsDatum> currentIterator = new ArrayList<>();
Iterators.addAll(currentIterator, providerQueue.iterator());
- StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator));
+ StreamsResultSet current = new StreamsResultSet(new ConcurrentLinkedQueue<>(currentIterator));
providerQueue.clear();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java
index 88aec81..595fb4f 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java
@@ -37,7 +37,6 @@ public class MoreoverProviderTask implements Runnable {
private String lastSequence;
private final String apiKey;
- private final String apiId;
private final Queue<StreamsDatum> results;
private final MoreoverClient moClient;
private boolean started = false;
@@ -51,11 +50,10 @@ public class MoreoverProviderTask implements Runnable {
*/
public MoreoverProviderTask(String apiId, String apiKey, Queue<StreamsDatum> results, String lastSequence) {
//logger.info("Constructed new task {} for {} {} {}", UUID.randomUUID().toString(), apiId, apiKey, lastSequence);
- this.apiId = apiId;
this.apiKey = apiKey;
this.results = results;
this.lastSequence = lastSequence;
- this.moClient = new MoreoverClient(this.apiId, this.apiKey, this.lastSequence);
+ this.moClient = new MoreoverClient(apiId, this.apiKey, this.lastSequence);
initializeClient(moClient);
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
index ec1f317..fffe7a1 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
@@ -32,8 +32,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import com.sysomos.SysomosConfiguration;
import com.typesafe.config.Config;
@@ -49,10 +47,11 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.math.BigInteger;
-import java.util.Iterator;
+import java.util.HashSet;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -76,7 +75,7 @@ public class SysomosProvider implements StreamsProvider {
public static final String STREAMS_ID = "SysomosProvider";
- public static enum Mode { CONTINUOUS, BACKFILL_AND_TERMINATE }
+ public enum Mode { CONTINUOUS, BACKFILL_AND_TERMINATE }
private static final Logger LOGGER = LoggerFactory.getLogger(SysomosProvider.class);
@@ -91,7 +90,7 @@ public class SysomosProvider implements StreamsProvider {
protected volatile Queue<StreamsDatum> providerQueue;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
- private final Set<String> completedHeartbeats = Sets.newHashSet();
+ private final Set<String> completedHeartbeats = new HashSet<>();
private final long maxQueued;
private final long minLatency;
private final long scheduledLatency;
@@ -332,7 +331,7 @@ public class SysomosProvider implements StreamsProvider {
}
private Queue<StreamsDatum> constructQueue() {
- return Queues.newConcurrentLinkedQueue();
+ return new ConcurrentLinkedQueue<>();
}
public int getCount() {
@@ -379,9 +378,7 @@ public class SysomosProvider implements StreamsProvider {
provider.startStream();
do {
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
- Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
- while (iterator.hasNext()) {
- StreamsDatum datum = iterator.next();
+ for (StreamsDatum datum : provider.readCurrent()) {
String json;
try {
json = mapper.writeValueAsString(datum.getDocument());
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java
index 82d538d..0acc81f 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java
@@ -21,8 +21,8 @@ package org.apache.streams.sysomos.util;
import org.apache.streams.sysomos.SysomosException;
-import com.google.common.base.Strings;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
@@ -66,7 +66,7 @@ public class SysomosUtils {
writer.flush();
String xmlResponse = writer.toString();
- if (Strings.isNullOrEmpty(xmlResponse)) {
+ if (StringUtils.isEmpty(xmlResponse)) {
throw new SysomosException("XML Response from Sysomos was empty : "
+ xmlResponse
+ "\n"
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
index 571c0fc..b85ad4a 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
@@ -19,14 +19,13 @@
package org.apache.streams.local.test.providers;
-import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
-import com.google.common.collect.Queues;
import org.joda.time.DateTime;
import java.math.BigInteger;
+import java.util.concurrent.LinkedBlockingQueue;
/**
* Provides new, empty instances of result set.
@@ -45,17 +44,17 @@ public class EmptyResultSetProvider implements StreamsProvider {
@Override
public StreamsResultSet readCurrent() {
- return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue());
+ return new StreamsResultSet(new LinkedBlockingQueue<>());
}
@Override
public StreamsResultSet readNew(BigInteger sequence) {
- return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue());
+ return new StreamsResultSet(new LinkedBlockingQueue<>());
}
@Override
public StreamsResultSet readRange(DateTime start, DateTime end) {
- return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue());
+ return new StreamsResultSet(new LinkedBlockingQueue<>());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
index 88494a8..21f37ba 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
@@ -22,12 +22,13 @@ import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
-import com.google.common.collect.Queues;
import org.joda.time.DateTime;
import java.math.BigInteger;
import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
/**
* Test StreamsProvider that sends out StreamsDatums numbered from 0 to numMessages.
@@ -57,7 +58,7 @@ public class NumericMessageProvider implements StreamsProvider {
@Override
public StreamsResultSet readCurrent() {
int batchSize = 0;
- Queue<StreamsDatum> batch = Queues.newLinkedBlockingQueue();
+ Queue<StreamsDatum> batch = new LinkedBlockingQueue<>();
try {
while (!this.data.isEmpty() && batchSize < DEFAULT_BATCH_SIZE) {
batch.add(this.data.take());
@@ -97,7 +98,7 @@ public class NumericMessageProvider implements StreamsProvider {
}
private BlockingQueue<StreamsDatum> constructQueue() {
- BlockingQueue<StreamsDatum> datums = Queues.newArrayBlockingQueue(numMessages);
+ BlockingQueue<StreamsDatum> datums = new ArrayBlockingQueue<>(numMessages);
for(int i=0;i<numMessages;i++) {
datums.add(new StreamsDatum(i));
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
index 0fbfae9..632d079 100644
--- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
+++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
@@ -22,12 +22,12 @@ import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
-import com.google.common.collect.Queues;
import org.joda.time.DateTime;
import java.math.BigInteger;
import java.util.Queue;
import java.util.Scanner;
+import java.util.concurrent.LinkedBlockingQueue;
/**
* FOR TESTING PURPOSES ONLY.
@@ -91,7 +91,7 @@ public class FileReaderProvider implements StreamsProvider {
}
private Queue<StreamsDatum> constructQueue(Scanner scanner) {
- Queue<StreamsDatum> data = Queues.newLinkedBlockingQueue();
+ Queue<StreamsDatum> data = new LinkedBlockingQueue<>();
while(scanner.hasNextLine()) {
data.add(converter.convert(scanner.nextLine()));
}