You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sm...@apache.org on 2016/11/26 18:08:04 UTC
[3/7] incubator-streams git commit: STREAMS-441: Remove compile
dependency on guava for core packages,
this closes apache/incubator-streams#320
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
index 9de1863..81b4f79 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
@@ -25,7 +25,6 @@ import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.rss.FeedDetails;
import org.apache.streams.rss.RssStreamConfiguration;
import org.apache.streams.rss.provider.perpetual.RssFeedScheduler;
import org.apache.streams.util.ComponentUtils;
@@ -47,11 +46,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
import java.util.Queue;
-import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
@@ -65,7 +60,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class RssStreamProvider implements StreamsProvider {
- public static final String STREAMS_ID = "RssStreamProvider";
+ private static final String STREAMS_ID = "RssStreamProvider";
private static final Logger LOGGER = LoggerFactory.getLogger(RssStreamProvider.class);
@@ -142,7 +137,7 @@ public class RssStreamProvider implements StreamsProvider {
@Override
public void prepare(Object configurationObject) {
- this.executor = new ThreadPoolExecutor(1, 4, 15L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+ this.executor = new ThreadPoolExecutor(1, 4, 15L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
this.dataQueue = new LinkedBlockingQueue<>();
this.scheduler = getScheduler(this.dataQueue);
this.isComplete = new AtomicBoolean(false);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java
index e4bfd35..975749f 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java
@@ -22,10 +22,10 @@ import org.apache.streams.core.StreamsDatum;
import org.apache.streams.rss.FeedDetails;
import org.apache.streams.rss.provider.RssStreamProviderTask;
-import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@@ -64,7 +64,7 @@ public class RssFeedScheduler implements Runnable {
this.feedDetailsList = feedDetailsList;
this.peroid = peroid;
this.keepRunning = new AtomicBoolean(true);
- this.lastScheduled = Maps.newHashMap();
+ this.lastScheduled = new HashMap<>();
this.dataQueue = dataQueue;
this.complete = new AtomicBoolean(false);
}
@@ -116,7 +116,7 @@ public class RssFeedScheduler implements Runnable {
}
if (currentTime - lastTime > pollInterval) {
this.service.execute(new RssStreamProviderTask(this.dataQueue, detail.getUrl()));
- this.LOGGER.trace("Scheduled data collection on rss feed, {}", detail.getUrl());
+ LOGGER.trace("Scheduled data collection on rss feed, {}", detail.getUrl());
this.lastScheduled.put(detail.getUrl(), currentTime);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
index 1e3aedd..45d5d96 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java
@@ -30,7 +30,6 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
@@ -38,6 +37,7 @@ import org.slf4j.LoggerFactory;
import java.util.LinkedList;
import java.util.List;
+import java.util.Objects;
public class SyndEntryActivitySerializer implements ActivitySerializer<ObjectNode> {
@@ -84,7 +84,7 @@ public class SyndEntryActivitySerializer implements ActivitySerializer<ObjectNod
* @return Activity
*/
public Activity deserializeWithRomeExtension(ObjectNode entry, boolean withExtension) {
- Preconditions.checkNotNull(entry);
+ Objects.requireNonNull(entry);
Activity activity = new Activity();
Provider provider = buildProvider(entry);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java
index 6868bfc..63920e7 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java
@@ -21,7 +21,6 @@ package org.apache.streams.rss.serializer;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
import com.sun.syndication.feed.module.Module;
import com.sun.syndication.feed.rss.Category;
import com.sun.syndication.feed.rss.Content;
@@ -37,7 +36,6 @@ import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
import java.util.Date;
import java.util.List;
@@ -250,9 +248,7 @@ public class SyndEntrySerializer {
}
private void serializeLinks(ObjectNode root, JsonNodeFactory factory, List links) {
- if (links == null || links.size() == 0) {
- return;
- } else if (links.get(0) instanceof String) {
+ if (links.get(0) instanceof String) {
serializeListOfStrings(links, "links", root, factory);
} else if (links.get(0) instanceof SyndLinkImpl) {
ArrayNode linksArray = factory.arrayNode();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java
index 08a58d3..102c3db 100644
--- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java
+++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java
@@ -24,13 +24,13 @@ import org.apache.streams.rss.RssStreamConfiguration;
import org.apache.streams.rss.provider.perpetual.RssFeedScheduler;
import com.carrotsearch.randomizedtesting.RandomizedTest;
-import com.google.common.collect.Queues;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
/**
* Unit tests for {@link org.apache.streams.rss.provider.RssStreamProvider}
@@ -44,7 +44,7 @@ public class RssStreamProviderTest extends RandomizedTest {
RssStreamProvider provider = null;
try {
final CountDownLatch latch = new CountDownLatch(1);
- BlockingQueue<StreamsDatum> datums = Queues.newLinkedBlockingQueue();
+ BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>();
provider = new RssStreamProvider(new RssStreamConfiguration()) {
@Override
protected RssFeedScheduler getScheduler(BlockingQueue<StreamsDatum> queue) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java
index 830f0e7..779c2ab 100644
--- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java
+++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java
@@ -18,16 +18,13 @@
package org.apache.streams.rss.provider.perpetual;
-import org.apache.streams.core.StreamsDatum;
import org.apache.streams.rss.FeedDetails;
import org.apache.streams.rss.provider.RssStreamProviderTask;
-import com.google.common.collect.Lists;
import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -51,15 +48,12 @@ public class RssFeedSchedulerTest {
public void testScheduleFeeds() {
ExecutorService mockService = mock(ExecutorService.class);
final List<String> queuedTasks = new ArrayList<>(5);
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- queuedTasks.add(((RssStreamProviderTask) invocationOnMock.getArguments()[0]).getRssFeed());
- return null;
- }
+ doAnswer(invocationOnMock -> {
+ queuedTasks.add(((RssStreamProviderTask) invocationOnMock.getArguments()[0]).getRssFeed());
+ return null;
}).when(mockService).execute(any(Runnable.class));
- RssFeedScheduler scheduler = new RssFeedScheduler(mockService, createFeedList(), new LinkedBlockingQueue<StreamsDatum>(), 1);
+ RssFeedScheduler scheduler = new RssFeedScheduler(mockService, createFeedList(), new LinkedBlockingQueue<>(), 1);
scheduler.scheduleFeeds();
assertEquals("Expected 2 Feeds to be scheduled", 2, queuedTasks.size());
assertEquals("Expected Feed 1 to be queued first", "1", queuedTasks.get(0));
@@ -78,7 +72,7 @@ public class RssFeedSchedulerTest {
}
private List<FeedDetails> createFeedList() {
- List<FeedDetails> list = Lists.newLinkedList();
+ List<FeedDetails> list = new LinkedList<>();
FeedDetails fd = new FeedDetails();
fd.setPollIntervalMillis(1L);
fd.setUrl("1");
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java
index ccd8b74..c3f10f9 100644
--- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java
+++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java
@@ -26,7 +26,6 @@ import org.apache.streams.rss.provider.RssStreamProvider;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -43,13 +42,11 @@ import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
+import java.util.ArrayList;
import java.util.List;
import static org.hamcrest.number.OrderingComparison.greaterThan;
-/**
- * Created by sblackmon on 2/5/14.
- */
public class RssStreamProviderIT {
private static final Logger LOGGER = LoggerFactory.getLogger(RssStreamProviderIT.class);
@@ -67,7 +64,7 @@ public class RssStreamProviderIT {
BufferedReader br = new BufferedReader(isr);
RssStreamConfiguration configuration = new RssStreamConfiguration();
- List<FeedDetails> feedArray = Lists.newArrayList();
+ List<FeedDetails> feedArray = new ArrayList<>();
try {
while (br.ready()) {
String line = br.readLine();
@@ -77,7 +74,6 @@ public class RssStreamProviderIT {
}
configuration.setFeeds(feedArray);
} catch ( Exception ex ) {
- System.out.println(ex);
ex.printStackTrace();
Assert.fail();
}
@@ -101,7 +97,7 @@ public class RssStreamProviderIT {
assert (config.canRead());
assert (config.isFile());
- RssStreamProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2]));
+ RssStreamProvider.main(new String[]{configfile, outfile});
File out = new File(outfile);
assert (out.exists());
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java
index 01f1999..37ff7cf 100644
--- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java
+++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java
@@ -27,7 +27,6 @@ import org.apache.streams.rss.serializer.SyndEntryActivitySerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.Test;
@@ -35,7 +34,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URL;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Objects;
import java.util.Scanner;
import static org.junit.Assert.assertEquals;
@@ -54,8 +55,8 @@ public class SyndEntryActivitySerializerIT {
@Test
public void testJsonData() throws Exception {
Scanner scanner = new Scanner(this.getClass().getResourceAsStream("/TestSyndEntryJson.txt"));
- List<Activity> activities = Lists.newLinkedList();
- List<ObjectNode> objects = Lists.newLinkedList();
+ List<Activity> activities = new LinkedList<>();
+ List<ObjectNode> objects = new LinkedList<>();
SyndEntryActivitySerializer serializer = new SyndEntryActivitySerializer();
@@ -125,7 +126,8 @@ public class SyndEntryActivitySerializerIT {
}
public void testUrl(String expectedUri, String expectedLink, Activity activity) {
- assertTrue((expectedUri == activity.getUrl() || expectedLink == activity.getUrl()));
- assertTrue((expectedUri == activity.getObject().getUrl() || expectedLink == activity.getObject().getUrl()));
+ assertTrue((Objects.equals(expectedUri, activity.getUrl()) || Objects.equals(expectedLink, activity.getUrl())));
+ assertTrue((Objects.equals(expectedUri, activity.getObject().getUrl()) ||
+ Objects.equals(expectedLink, activity.getObject().getUrl())));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java
index 6b59d1e..95d8ad9 100644
--- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java
+++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java
@@ -19,11 +19,7 @@
package org.apache.streams.sysomos.provider;
-import org.apache.streams.sysomos.data.HeartbeatInfo;
-import org.apache.streams.sysomos.util.SysomosUtils;
-
import java.net.HttpURLConnection;
-import java.net.URL;
/**
* Wrapper for the Sysomos API.
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
index f555e8d..915a8cf 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
@@ -30,12 +30,12 @@ import org.apache.streams.twitter.pojo.UserstreamEvent;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import static org.apache.streams.twitter.converter.TwitterDateTimeFormat.TWITTER_FORMAT;
@@ -47,9 +47,9 @@ public class TwitterDocumentClassifier implements DocumentClassifier {
@Override
public List<Class> detectClasses(Object document) {
- Preconditions.checkNotNull(document);
+ Objects.requireNonNull(document);
- ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TWITTER_FORMAT));
+ ObjectMapper mapper = StreamsJacksonMapper.getInstance(Collections.singletonList(TWITTER_FORMAT));
ObjectNode objectNode;
try {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java
index b8ce79b..9ca354f 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java
@@ -23,16 +23,17 @@ import org.apache.streams.exceptions.ActivityConversionException;
import org.apache.streams.pojo.json.Activity;
import org.apache.streams.twitter.pojo.User;
-import com.google.common.collect.Lists;
import org.apache.commons.lang.NotImplementedException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.updateActivity;
public class TwitterJsonUserActivityConverter implements ActivityConverter<User> {
- public static Class requiredClass = User.class;
+ private static Class requiredClass = User.class;
@Override
public Class requiredClass() {
@@ -67,12 +68,12 @@ public class TwitterJsonUserActivityConverter implements ActivityConverter<User>
Activity activity = new Activity();
updateActivity(user, activity);
- return Lists.newArrayList(activity);
+ return Collections.singletonList(activity);
}
@Override
public List<Activity> toActivityList(List<User> serializedList) {
- List<Activity> result = Lists.newArrayList();
+ List<Activity> result = new ArrayList<>();
for ( User item : serializedList ) {
try {
List<Activity> activities = toActivityList(item);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
index e0e2e80..3344d05 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
@@ -32,15 +32,14 @@ import org.apache.streams.twitter.pojo.Entities;
import org.apache.streams.twitter.pojo.Hashtag;
import org.apache.streams.twitter.pojo.Place;
import org.apache.streams.twitter.pojo.Retweet;
+import org.apache.streams.twitter.pojo.TargetObject;
import org.apache.streams.twitter.pojo.Tweet;
import org.apache.streams.twitter.pojo.User;
import org.apache.streams.twitter.pojo.UserMentions;
-import org.apache.streams.twitter.provider.TwitterErrorHandler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
@@ -50,6 +49,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static com.google.common.math.DoubleMath.mean;
@@ -71,13 +71,11 @@ public class TwitterActivityUtil {
public static void updateActivity(Tweet tweet, Activity activity) throws ActivityConversionException {
activity.setActor(buildActor(tweet));
activity.setId(formatId(activity.getVerb(),
- Optional.fromNullable(
- tweet.getIdStr())
- .or(Optional.of(tweet.getId().toString()))
- .orNull()));
+ Optional.ofNullable(Optional.ofNullable(tweet.getIdStr())
+ .orElseGet(Optional.of(tweet.getId().toString())::get)).orElse(null)));
if (tweet instanceof Retweet) {
- updateActivityContent(activity, ((Retweet) tweet).getRetweetedStatus(), "share");
+ updateActivityContent(activity, (tweet).getRetweetedStatus(), "share");
} else {
updateActivityContent(activity, tweet, "post");
}
@@ -127,7 +125,7 @@ public class TwitterActivityUtil {
}
/**
- * Builds the activity {@link org.apache.streams.pojo.json.ActivityObject} actor from the tweet
+ * Builds the activity {@link ActivityObject} actor from the tweet
* @param tweet the object to use as the source
* @return a valid Actor populated from the Tweet
*/
@@ -139,17 +137,15 @@ public class TwitterActivityUtil {
}
/**
- * Builds the activity {@link org.apache.streams.pojo.json.ActivityObject} actor from the User
+ * Builds the activity {@link ActivityObject} actor from the User
* @param user the object to use as the source
* @return a valid Actor populated from the Tweet
*/
public static ActivityObject buildActor(User user) {
ActivityObject actor = new ActivityObject();
actor.setId(formatId(
- Optional.fromNullable(
- user.getIdStr())
- .or(Optional.of(user.getId().toString()))
- .orNull()
+ Optional.ofNullable(Optional.ofNullable(user.getIdStr())
+ .orElseGet(Optional.of(user.getId().toString())::get)).orElse(null)
));
actor.setObjectType("page");
actor.setDisplayName(user.getName());
@@ -189,16 +185,14 @@ public class TwitterActivityUtil {
}
/**
- * Creates an {@link org.apache.streams.pojo.json.ActivityObject} for the tweet
+ * Creates an {@link ActivityObject} for the tweet
* @param tweet the object to use as the source
* @return a valid ActivityObject
*/
public static ActivityObject buildActivityObject(Tweet tweet) {
ActivityObject actObj = new ActivityObject();
- String id = Optional.fromNullable(
- tweet.getIdStr())
- .or(Optional.of(tweet.getId().toString()))
- .orNull();
+ String id = Optional.ofNullable(Optional.ofNullable(tweet.getIdStr())
+ .orElseGet(Optional.of(tweet.getId().toString())::get)).orElse(null);
if ( id != null ) {
actObj.setId(id);
}
@@ -261,7 +255,7 @@ public class TwitterActivityUtil {
}
/**
- * Builds the {@link org.apache.streams.twitter.pojo.TargetObject} from the tweet.
+ * Builds the {@link TargetObject} from the tweet.
* @param tweet the object to use as the source
* @return currently returns null for all activities
*/
@@ -278,17 +272,15 @@ public class TwitterActivityUtil {
Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
Map<String, Object> location = new HashMap<>();
location.put("id", formatId(
- Optional.fromNullable(
- tweet.getIdStr())
- .or(Optional.of(tweet.getId().toString()))
- .orNull()
+ Optional.ofNullable(Optional.ofNullable(tweet.getIdStr())
+ .orElseGet(Optional.of(tweet.getId().toString())::get)).orElse(null)
));
location.put("coordinates", boundingBoxCenter(tweet.getPlace()));
extensions.put("location", location);
}
/**
- * Gets the common twitter {@link org.apache.streams.pojo.json.Provider} object
+ * Gets the common twitter {@link Provider} object
* @return a provider object representing Twitter
*/
public static Provider getProvider() {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
index ee800fa..764b3ee 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
@@ -33,6 +33,8 @@ import twitter4j.Twitter;
import twitter4j.TwitterException;
import twitter4j.TwitterObjectFactory;
+import java.util.Objects;
+
/**
* Retrieve friend or follower connections for a single user id.
*/
@@ -43,11 +45,11 @@ public class TwitterFollowingProviderTask implements Runnable {
private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
protected TwitterFollowingProvider provider;
- protected Twitter client;
+ private Twitter client;
protected Long id;
- protected String screenName;
+ private String screenName;
- int count = 0;
+ private int count = 0;
/**
* TwitterFollowingProviderTask constructor.
@@ -81,7 +83,7 @@ public class TwitterFollowingProviderTask implements Runnable {
if ( id != null ) {
getFollowing(id);
- } else if ( screenName != null) {
+ } else {
getFollowing(screenName);
}
@@ -89,7 +91,7 @@ public class TwitterFollowingProviderTask implements Runnable {
}
- protected void getFollowing(Long id) {
+ private void getFollowing(Long id) {
Preconditions.checkArgument(
provider.getConfig().getEndpoint().equals("friends")
@@ -103,7 +105,7 @@ public class TwitterFollowingProviderTask implements Runnable {
}
}
- protected void getFollowing(String screenName) {
+ private void getFollowing(String screenName) {
twitter4j.User user = null;
try {
@@ -111,7 +113,7 @@ public class TwitterFollowingProviderTask implements Runnable {
} catch (TwitterException ex) {
LOGGER.error("Failure looking up " + id);
}
- Preconditions.checkNotNull(user);
+ Objects.requireNonNull(user);
getFollowing(user.getId());
}
@@ -134,12 +136,12 @@ public class TwitterFollowingProviderTask implements Runnable {
PagableResponseList<twitter4j.User> list = null;
if ( provider.getConfig().getEndpoint().equals("followers") ) {
- list = client.friendsFollowers().getFollowersList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
+ list = client.friendsFollowers().getFollowersList(id, curser, provider.getConfig().getMaxItems().intValue());
} else if ( provider.getConfig().getEndpoint().equals("friends") ) {
- list = client.friendsFollowers().getFriendsList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
+ list = client.friendsFollowers().getFriendsList(id, curser, provider.getConfig().getMaxItems().intValue());
}
- Preconditions.checkNotNull(list);
+ Objects.requireNonNull(list);
Preconditions.checkArgument(list.size() > 0);
for (twitter4j.User other : list) {
@@ -176,8 +178,6 @@ public class TwitterFollowingProviderTask implements Runnable {
break;
}
curser = list.getNextCursor();
- } catch (TwitterException twitterException) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
} catch (Exception ex) {
keepTrying += TwitterErrorHandler.handleTwitterError(client, ex);
}
@@ -194,12 +194,12 @@ public class TwitterFollowingProviderTask implements Runnable {
try {
twitter4j.IDs ids = null;
if ( provider.getConfig().getEndpoint().equals("followers") ) {
- ids = client.friendsFollowers().getFollowersIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
+ ids = client.friendsFollowers().getFollowersIDs(id, curser, provider.getConfig().getMaxItems().intValue());
} else if ( provider.getConfig().getEndpoint().equals("friends") ) {
- ids = client.friendsFollowers().getFriendsIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue());
+ ids = client.friendsFollowers().getFriendsIDs(id, curser, provider.getConfig().getMaxItems().intValue());
}
- Preconditions.checkNotNull(ids);
+ Objects.requireNonNull(ids);
Preconditions.checkArgument(ids.getIDs().length > 0);
for (long otherId : ids.getIDs()) {
@@ -216,7 +216,7 @@ public class TwitterFollowingProviderTask implements Runnable {
.withFollower(new User().withId(id));
}
- Preconditions.checkNotNull(follow);
+ Objects.requireNonNull(follow);
if ( count < provider.getConfig().getMaxItems()) {
ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java
index a4562ef..2be77ac 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java
@@ -24,7 +24,6 @@ import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
import org.apache.streams.util.ComponentUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,7 +99,9 @@ public class TwitterStreamHelper extends StringDelimitedProcessor {
Class itemClass = TWITTER_DOCUMENT_CLASSIFIER.detectClasses(item).get(0);
Object document = mapper.readValue(item, itemClass);
StreamsDatum rawDatum = new StreamsDatum(document);
- return Lists.newArrayList(rawDatum);
+ List<StreamsDatum> streamsDatumList = new ArrayList<>();
+ streamsDatumList.add(rawDatum);
+ return streamsDatumList;
}
return new ArrayList<>();
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 1895ee2..a37ec4d 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -35,8 +35,6 @@ import org.apache.streams.util.ComponentUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
import com.google.common.util.concurrent.Uninterruptibles;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Constants;
@@ -67,25 +65,24 @@ import java.io.PrintStream;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
- * TwitterStreamProvider wraps a hosebird client and passes recieved documents
+ * TwitterStreamProvider wraps a hosebird client and passes received documents
* to subscribing components.
*/
public class TwitterStreamProvider implements StreamsProvider, Serializable, DatumStatusCountable {
- public static final String STREAMS_ID = "TwitterStreamProvider";
+ private static final String STREAMS_ID = "TwitterStreamProvider";
private static final Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class);
@@ -127,9 +124,9 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
TwitterStreamConfiguration config = new ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(typesafe, "twitter");
TwitterStreamProvider provider = new TwitterStreamProvider(config);
- ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+ ObjectMapper mapper = StreamsJacksonMapper.getInstance(Collections.singletonList(TwitterDateTimeFormat.TWITTER_FORMAT));
- PrintStream outStream = null;
+ PrintStream outStream;
try {
outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
} catch (FileNotFoundException ex) {
@@ -140,9 +137,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
provider.startStream();
do {
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
- Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
- while (iterator.hasNext()) {
- StreamsDatum datum = iterator.next();
+ for (StreamsDatum datum : provider.readCurrent()) {
String json;
try {
json = mapper.writeValueAsString(datum.getDocument());
@@ -157,7 +152,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
outStream.flush();
}
- public static final int MAX_BATCH = 1000;
+ private static final int MAX_BATCH = 1000;
private TwitterStreamConfiguration config;
@@ -169,13 +164,12 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
this.config = config;
}
- protected volatile Queue<Future<List<StreamsDatum>>> providerQueue;
+ private volatile Queue<Future<List<StreamsDatum>>> providerQueue;
- protected Hosts hosebirdHosts;
- protected Authentication auth;
+ private Authentication auth;
protected StreamingEndpoint endpoint;
- protected BasicClient client;
- protected AtomicBoolean running = new AtomicBoolean(false);
+ private BasicClient client;
+ private AtomicBoolean running = new AtomicBoolean(false);
protected TwitterStreamHelper processor = new TwitterStreamHelper(this);
private DatumStatusCounter countersCurrent = new DatumStatusCounter();
private DatumStatusCounter countersTotal = new DatumStatusCounter();
@@ -204,7 +198,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
StreamsResultSet current;
synchronized (this) {
- Queue<StreamsDatum> drain = Queues.newLinkedBlockingDeque();
+ Queue<StreamsDatum> drain = new LinkedBlockingDeque<>();
drainTo(drain);
current = new StreamsResultSet(drain);
current.setCounter(new DatumStatusCounter());
@@ -234,8 +228,9 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
@Override
public void prepare(Object configurationObject) {
- Preconditions.checkNotNull(config.getEndpoint());
+ Objects.requireNonNull(config.getEndpoint());
+ Hosts hosebirdHosts;
if (config.getEndpoint().equals("userstream") ) {
hosebirdHosts = new HttpHosts(Constants.USERSTREAM_HOST);
@@ -276,8 +271,8 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
if ( config.getBasicauth() != null ) {
- Preconditions.checkNotNull(config.getBasicauth().getUsername());
- Preconditions.checkNotNull(config.getBasicauth().getPassword());
+ Objects.requireNonNull(config.getBasicauth().getUsername());
+ Objects.requireNonNull(config.getBasicauth().getPassword());
auth = new BasicAuth(
config.getBasicauth().getUsername(),
@@ -286,10 +281,10 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat
} else if ( config.getOauth() != null ) {
- Preconditions.checkNotNull(config.getOauth().getConsumerKey());
- Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
- Preconditions.checkNotNull(config.getOauth().getAccessToken());
- Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
+ Objects.requireNonNull(config.getOauth().getConsumerKey());
+ Objects.requireNonNull(config.getOauth().getConsumerSecret());
+ Objects.requireNonNull(config.getOauth().getAccessToken());
+ Objects.requireNonNull(config.getOauth().getAccessTokenSecret());
auth = new OAuth1(config.getOauth().getConsumerKey(),
config.getOauth().getConsumerSecret(),
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index 3210f80..214d204 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -35,7 +35,6 @@ import org.apache.streams.util.ComponentUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
@@ -47,7 +46,6 @@ import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.Twitter;
-import twitter4j.TwitterException;
import twitter4j.TwitterFactory;
import twitter4j.conf.ConfigurationBuilder;
import twitter4j.json.DataObjectFactory;
@@ -59,8 +57,10 @@ import java.io.PrintStream;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
@@ -78,9 +78,9 @@ import static java.util.concurrent.Executors.newSingleThreadExecutor;
*/
public class TwitterUserInformationProvider implements StreamsProvider, Serializable {
- public static final String STREAMS_ID = "TwitterUserInformationProvider";
+ private static final String STREAMS_ID = "TwitterUserInformationProvider";
- private static ObjectMapper MAPPER = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+ private static ObjectMapper MAPPER = new StreamsJacksonMapper(Collections.singletonList(TwitterDateTimeFormat.TWITTER_FORMAT));
private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProvider.class);
@@ -133,9 +133,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
provider.startStream();
do {
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
- Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
- while (iterator.hasNext()) {
- StreamsDatum datum = iterator.next();
+ for (StreamsDatum datum : provider.readCurrent()) {
String json;
try {
json = MAPPER.writeValueAsString(datum.getDocument());
@@ -176,7 +174,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
public static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) {
return new ThreadPoolExecutor(numThreads, numThreads,
5000L, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+ new ArrayBlockingQueue<>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
@@ -207,13 +205,13 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
config = (TwitterUserInformationConfiguration) configurationObject;
}
- Preconditions.checkNotNull(config);
- Preconditions.checkNotNull(config.getOauth());
- Preconditions.checkNotNull(config.getOauth().getConsumerKey());
- Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
- Preconditions.checkNotNull(config.getOauth().getAccessToken());
- Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
- Preconditions.checkNotNull(config.getInfo());
+ Objects.requireNonNull(config);
+ Objects.requireNonNull(config.getOauth());
+ Objects.requireNonNull(config.getOauth().getConsumerKey());
+ Objects.requireNonNull(config.getOauth().getConsumerSecret());
+ Objects.requireNonNull(config.getOauth().getAccessToken());
+ Objects.requireNonNull(config.getOauth().getAccessTokenSecret());
+ Objects.requireNonNull(config.getInfo());
try {
lock.writeLock().lock();
@@ -222,13 +220,13 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
lock.writeLock().unlock();
}
- Preconditions.checkNotNull(providerQueue);
+ Objects.requireNonNull(providerQueue);
- List<String> screenNames = new ArrayList<String>();
- List<String[]> screenNameBatches = new ArrayList<String[]>();
+ List<String> screenNames = new ArrayList<>();
+ List<String[]> screenNameBatches = new ArrayList<>();
- List<Long> ids = new ArrayList<Long>();
- List<Long[]> idsBatches = new ArrayList<Long[]>();
+ List<Long> ids = new ArrayList<>();
+ List<Long[]> idsBatches = new ArrayList<>();
for (String s : config.getInfo()) {
if (s != null) {
@@ -248,14 +246,14 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
// add the batch
idsBatches.add(ids.toArray(new Long[ids.size()]));
// reset the Ids
- ids = new ArrayList<Long>();
+ ids = new ArrayList<>();
}
if (screenNames.size() >= 100) {
// add the batch
screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
// reset the Ids
- screenNames = new ArrayList<String>();
+ screenNames = new ArrayList<>();
}
}
}
@@ -275,7 +273,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor());
}
- Preconditions.checkNotNull(executor);
+ Objects.requireNonNull(executor);
this.idsBatches = idsBatches.iterator();
this.screenNameBatches = screenNameBatches.iterator();
@@ -284,7 +282,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
@Override
public void startStream() {
- Preconditions.checkNotNull(executor);
+ Objects.requireNonNull(executor);
Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
@@ -327,8 +325,6 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
}
}
keepTrying = 10;
- } catch (TwitterException twitterException) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
} catch (Exception ex) {
keepTrying += TwitterErrorHandler.handleTwitterError(client, ex);
}
@@ -353,8 +349,6 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
}
}
keepTrying = 10;
- } catch (TwitterException twitterException) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
} catch (Exception ex) {
keepTrying += TwitterErrorHandler.handleTwitterError(client, ex);
}
@@ -383,7 +377,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
}
protected Queue<StreamsDatum> constructQueue() {
- return new LinkedBlockingQueue<StreamsDatum>();
+ return new LinkedBlockingQueue<>();
}
public StreamsResultSet readNew(BigInteger sequence) {
@@ -397,8 +391,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
this.start = start;
this.end = end;
readCurrent();
- StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
- return result;
+ return (StreamsResultSet)providerQueue.iterator();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java
index 6e269e5..d8967d1 100644
--- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java
+++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java
@@ -26,7 +26,6 @@ import org.apache.streams.pojo.json.Activity;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.api.services.youtube.model.Channel;
import com.google.api.services.youtube.model.Video;
-import com.google.common.collect.Lists;
import com.youtube.serializer.YoutubeActivityUtil;
import com.youtube.serializer.YoutubeChannelDeserializer;
import com.youtube.serializer.YoutubeEventClassifier;
@@ -35,6 +34,7 @@ import org.apache.commons.lang.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
@@ -47,7 +47,6 @@ public class YoutubeTypeConverter implements StreamsProcessor {
private StreamsJacksonMapper mapper;
private Queue<Video> inQueue;
private Queue<StreamsDatum> outQueue;
- private YoutubeActivityUtil youtubeActivityUtil;
private int count = 0;
public YoutubeTypeConverter() {}
@@ -65,7 +64,7 @@ public class YoutubeTypeConverter implements StreamsProcessor {
Object item = streamsDatum.getDocument();
LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
- Activity activity = null;
+ Activity activity;
if (item instanceof String) {
item = deserializeItem(item);
@@ -73,10 +72,10 @@ public class YoutubeTypeConverter implements StreamsProcessor {
if (item instanceof Video) {
activity = new Activity();
- youtubeActivityUtil.updateActivity((Video)item, activity, streamsDatum.getId());
+ YoutubeActivityUtil.updateActivity((Video)item, activity, streamsDatum.getId());
} else if (item instanceof Channel) {
activity = new Activity();
- this.youtubeActivityUtil.updateActivity((Channel)item, activity, null);
+ YoutubeActivityUtil.updateActivity((Channel)item, activity, null);
} else {
throw new NotImplementedException("Type conversion not implement for type : " + item.getClass().getName());
}
@@ -90,9 +89,11 @@ public class YoutubeTypeConverter implements StreamsProcessor {
}
if ( result != null ) {
- return Lists.newArrayList(result);
+ List<StreamsDatum> streamsDatumList = new ArrayList<>();
+ streamsDatumList.add(result);
+ return streamsDatumList;
} else {
- return Lists.newArrayList();
+ return new ArrayList<>();
}
}
@@ -113,7 +114,6 @@ public class YoutubeTypeConverter implements StreamsProcessor {
@Override
public void prepare(Object configurationObject) {
- youtubeActivityUtil = new YoutubeActivityUtil();
mapper = StreamsJacksonMapper.getInstance();
SimpleModule simpleModule = new SimpleModule();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
index 1442f8b..401b836 100644
--- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
+++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java
@@ -38,8 +38,6 @@ import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.client.repackaged.com.google.common.base.Strings;
import com.google.api.services.youtube.YouTube;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -54,9 +52,11 @@ import java.io.IOException;
import java.math.BigInteger;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
@@ -65,24 +65,24 @@ import java.util.concurrent.atomic.AtomicBoolean;
public abstract class YoutubeProvider implements StreamsProvider {
- public static final String STREAMS_ID = "YoutubeProvider";
+ private static final String STREAMS_ID = "YoutubeProvider";
private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeProvider.class);
private static final int MAX_BATCH_SIZE = 1000;
// This OAuth 2.0 access scope allows for full read/write access to the
// authenticated user's account.
- private List<String> scopes = Lists.newArrayList("https://www.googleapis.com/auth/youtube");
+ private List<String> scopes = Collections.singletonList("https://www.googleapis.com/auth/youtube");
/**
* Define a global instance of the HTTP transport.
*/
- public static final HttpTransport HTTP_TRANSPORT = new NetHttpTransport();
+ private static final HttpTransport HTTP_TRANSPORT = new NetHttpTransport();
/**
* Define a global instance of the JSON factory.
*/
- public static final JsonFactory JSON_FACTORY = new JacksonFactory();
+ private static final JsonFactory JSON_FACTORY = new JacksonFactory();
private static final int DEFAULT_THREAD_POOL_SIZE = 5;
@@ -104,7 +104,7 @@ public abstract class YoutubeProvider implements StreamsProvider {
this.config = new ComponentConfigurator<>(YoutubeConfiguration.class)
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("youtube"));
- Preconditions.checkNotNull(this.config.getApiKey());
+ Objects.requireNonNull(this.config.getApiKey());
}
/**
@@ -114,7 +114,7 @@ public abstract class YoutubeProvider implements StreamsProvider {
public YoutubeProvider(YoutubeConfiguration config) {
this.config = config;
- Preconditions.checkNotNull(this.config.getApiKey());
+ Objects.requireNonNull(this.config.getApiKey());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java
index 4754353..0ca161f 100644
--- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java
+++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java
@@ -26,13 +26,12 @@ import org.apache.streams.pojo.json.ActivityObject;
import org.apache.streams.pojo.json.Image;
import org.apache.streams.pojo.json.Provider;
-import com.google.api.client.util.Maps;
+import com.google.api.services.youtube.YouTube;
import com.google.api.services.youtube.model.Channel;
import com.google.api.services.youtube.model.Thumbnail;
import com.google.api.services.youtube.model.ThumbnailDetails;
import com.google.api.services.youtube.model.Video;
import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import org.joda.time.DateTime;
import org.slf4j.Logger;
@@ -40,14 +39,15 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
public class YoutubeActivityUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeActivityUtil.class);
/**
- * Given a {@link com.google.api.services.youtube.YouTube.Videos} object and an
- * {@link org.apache.streams.pojo.json.Activity} object, fill out the appropriate details
+ * Given a {@link YouTube.Videos} object and an
+ * {@link Activity} object, fill out the appropriate details
*
* @param video Video
* @param activity Activity
@@ -57,10 +57,7 @@ public class YoutubeActivityUtil {
activity.setActor(buildActor(video, video.getSnippet().getChannelId()));
activity.setVerb("post");
- activity.setId(formatId(activity.getVerb(),
- Optional.fromNullable(
- video.getId())
- .orNull()));
+ activity.setId(formatId(activity.getVerb(), Optional.ofNullable(video.getId()).orElse(null)));
activity.setPublished(new DateTime(video.getSnippet().getPublishedAt().getValue()));
activity.setTitle(video.getSnippet().getTitle());
@@ -76,8 +73,8 @@ public class YoutubeActivityUtil {
/**
- * Given a {@link com.google.api.services.youtube.model.Channel} object and an
- * {@link org.apache.streams.pojo.json.Activity} object, fill out the appropriate details
+ * Given a {@link Channel} object and an
+ * {@link Activity} object, fill out the appropriate details
*
* @param channel Channel
* @param activity Activity
@@ -88,7 +85,7 @@ public class YoutubeActivityUtil {
activity.setProvider(getProvider());
activity.setVerb("post");
activity.setActor(createActorForChannel(channel));
- Map<String, Object> extensions = Maps.newHashMap();
+ Map<String, Object> extensions = new HashMap<>();
extensions.put("youtube", channel);
activity.setAdditionalProperty("extensions", extensions);
} catch (Throwable throwable) {
@@ -111,7 +108,7 @@ public class YoutubeActivityUtil {
image.setUrl(channel.getSnippet().getThumbnails().getHigh().getUrl());
actor.setImage(image);
actor.setUrl("https://youtube.com/user/" + channel.getId());
- Map<String, Object> actorExtensions = Maps.newHashMap();
+ Map<String, Object> actorExtensions = new HashMap<>();
actorExtensions.put("followers", channel.getStatistics().getSubscriberCount());
actorExtensions.put("posts", channel.getStatistics().getVideoCount());
actor.setAdditionalProperty("extensions", actorExtensions);
@@ -163,7 +160,7 @@ public class YoutubeActivityUtil {
}
/**
- * Build an {@link org.apache.streams.pojo.json.ActivityObject} actor given the video object
+ * Build an {@link ActivityObject} actor given the video object
* @param video Video
* @param id id
* @return Actor object
@@ -180,7 +177,7 @@ public class YoutubeActivityUtil {
}
/**
- * Gets the common youtube {@link org.apache.streams.pojo.json.Provider} object
+ * Gets the common youtube {@link Provider} object
* @return a provider object representing YouTube
*/
public static Provider getProvider() {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java
index ea9a49d..e28b4a1 100644
--- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java
+++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java
@@ -20,7 +20,6 @@
package com.youtube.serializer;
import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
@@ -33,10 +32,9 @@ import com.google.api.services.youtube.model.ChannelStatistics;
import com.google.api.services.youtube.model.ChannelTopicDetails;
import com.google.api.services.youtube.model.Thumbnail;
import com.google.api.services.youtube.model.ThumbnailDetails;
-import com.google.common.collect.Lists;
import java.io.IOException;
-import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
/**
@@ -45,7 +43,7 @@ import java.util.List;
public class YoutubeChannelDeserializer extends JsonDeserializer<Channel> {
@Override
- public Channel deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException {
+ public Channel deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException {
JsonNode node = jp.getCodec().readTree(jp);
try {
Channel channel = new Channel();
@@ -144,10 +142,9 @@ public class YoutubeChannelDeserializer extends JsonDeserializer<Channel> {
if (node == null) {
return details;
}
- List<String> topicIds = Lists.newLinkedList();
- Iterator<JsonNode> it = node.get("topicIds").iterator();
- while (it.hasNext()) {
- topicIds.add(it.next().asText());
+ List<String> topicIds = new LinkedList<>();
+ for (JsonNode jsonNode : node.get("topicIds")) {
+ topicIds.add(jsonNode.asText());
}
details.setTopicIds(topicIds);
return details;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java
index 65a454c..e7645bd 100644
--- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java
+++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java
@@ -27,6 +27,7 @@ import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import java.io.IOException;
+import java.util.Objects;
public class YoutubeEventClassifier {
private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
@@ -39,7 +40,7 @@ public class YoutubeEventClassifier {
* @return Class
*/
public static Class detectClass(String json) {
- Preconditions.checkNotNull(json);
+ Objects.requireNonNull(json);
Preconditions.checkArgument(StringUtils.isNotEmpty(json));
ObjectNode objectNode;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java
index 4751f00..0ae4822 100644
--- a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java
+++ b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java
@@ -27,13 +27,13 @@ import org.apache.streams.util.api.requests.backoff.impl.LinearTimeBackOffStrate
import com.google.api.services.youtube.YouTube;
import com.google.api.services.youtube.model.Channel;
import com.google.api.services.youtube.model.ChannelListResponse;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
import org.apache.youtube.pojo.YoutubeConfiguration;
import org.junit.Test;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -51,7 +51,7 @@ public class YoutubeChannelDataCollectorTest {
@Test
public void testDataCollector() throws Exception {
YouTube youTube = createMockYoutube();
- BlockingQueue<StreamsDatum> queue = Queues.newLinkedBlockingQueue();
+ BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>();
BackOffStrategy strategy = new LinearTimeBackOffStrategy(1);
UserInfo userInfo = new UserInfo();
userInfo.setUserId(ID);
@@ -91,7 +91,7 @@ public class YoutubeChannelDataCollectorTest {
private ChannelListResponse createMockResponse() {
ChannelListResponse response = new ChannelListResponse();
- List<Channel> channelList = Lists.newLinkedList();
+ List<Channel> channelList = new LinkedList<>();
response.setItems(channelList);
Channel channel = new Channel();
channel.setId(ID);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java
index 5a2af8a..54c0375 100644
--- a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java
+++ b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java
@@ -23,15 +23,15 @@ import org.apache.streams.core.StreamsDatum;
import org.apache.streams.google.gplus.configuration.UserInfo;
import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
-import com.google.api.client.util.Maps;
-import com.google.api.client.util.Sets;
import com.google.api.services.youtube.YouTube;
-import com.google.common.collect.Lists;
import org.apache.youtube.pojo.YoutubeConfiguration;
import org.joda.time.DateTime;
import org.junit.Test;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -54,7 +54,7 @@ public class YoutubeProviderTest {
public void testDataCollectorRunsPerUser() {
Random random = new Random(System.currentTimeMillis());
int numUsers = random.nextInt(1000);
- List<UserInfo> userList = Lists.newLinkedList();
+ List<UserInfo> userList = new LinkedList<>();
for ( int i = 0; i < numUsers; ++i ) {
userList.add(new UserInfo());
@@ -107,7 +107,7 @@ public class YoutubeProviderTest {
provider.setDefaultAfterDate(afterDate);
provider.setDefaultBeforeDate(beforeDate);
- Set<String> users = Sets.newHashSet();
+ Set<String> users = new HashSet<>();
users.add("test_user_1");
users.add("test_user_2");
users.add("test_user_3");
@@ -128,7 +128,7 @@ public class YoutubeProviderTest {
config.setApiKey("API_KEY");
YoutubeProvider provider = buildProvider(config);
- Map<String, DateTime> users = Maps.newHashMap();
+ Map<String, DateTime> users = new HashMap<>();
users.put("user1", new DateTime(System.currentTimeMillis()));
users.put("user3", new DateTime(System.currentTimeMillis()));
users.put("user4", new DateTime(System.currentTimeMillis()));
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeUserActivityCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeUserActivityCollectorTest.java b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeUserActivityCollectorTest.java
index 1870c14..d6a540e 100644
--- a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeUserActivityCollectorTest.java
+++ b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeUserActivityCollectorTest.java
@@ -24,7 +24,6 @@ import org.apache.streams.google.gplus.configuration.UserInfo;
import org.apache.streams.local.queues.ThroughputQueue;
import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
-import com.google.api.client.util.Lists;
import com.google.api.services.youtube.YouTube;
import com.google.api.services.youtube.model.Activity;
import com.google.api.services.youtube.model.ActivityContentDetails;
@@ -41,6 +40,8 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@@ -170,7 +171,7 @@ public class YoutubeUserActivityCollectorTest {
private ActivityListResponse buildActivityListResponse(int num) {
ActivityListResponse activityListResponse = new ActivityListResponse();
- List<Activity> items = Lists.newArrayList();
+ List<Activity> items = new ArrayList<>();
for ( int x = 0; x < num; x++ ) {
Activity activity = new Activity();
@@ -193,21 +194,15 @@ public class YoutubeUserActivityCollectorTest {
private YouTube buildYouTube(int numBeforeRange, int numAfterRange, int numInRange, DateTime afterDate, DateTime beforeDate) {
- YouTube youtube = createYoutubeMock(numBeforeRange, numAfterRange, numInRange, afterDate, beforeDate);
+ return createYoutubeMock(numBeforeRange, numAfterRange, numInRange, afterDate, beforeDate);
- return youtube;
}
private YouTube createYoutubeMock(int numBefore, int numAfter, int numInRange, DateTime after, DateTime before) {
YouTube youtube = mock(YouTube.class);
final YouTube.Videos videos = createMockVideos(numBefore, numAfter, numInRange, after, before);
- doAnswer(new Answer() {
- @Override
- public YouTube.Videos answer(InvocationOnMock invocationOnMock) throws Throwable {
- return videos;
- }
- }).when(youtube).videos();
+ doAnswer(invocationOnMock -> videos).when(youtube).videos();
return youtube;
}
@@ -245,7 +240,7 @@ public class YoutubeUserActivityCollectorTest {
private static VideoListResponse createMockVideoListResponse(int numBefore, int numAfter, int numInRange, DateTime after, DateTime before, boolean page) {
VideoListResponse feed = new VideoListResponse();
- List<Video> list = com.google.common.collect.Lists.newLinkedList();
+ List<Video> list = new LinkedList<>();
for (int i = 0; i < numAfter; ++i) {
com.google.api.client.util.DateTime published = new com.google.api.client.util.DateTime(after.getMillis() + 1000000);
@@ -255,7 +250,7 @@ public class YoutubeUserActivityCollectorTest {
list.add(video);
}
for (int i = 0; i < numInRange; ++i) {
- DateTime published = null;
+ DateTime published;
if ((before == null && after == null) || before == null) {
published = DateTime.now(); // no date range or end time date range so just make the time now.
} else if (after == null) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java b/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java
index 1052647..0920829 100644
--- a/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java
@@ -40,9 +40,9 @@ import java.math.BigInteger;
*/
public interface StreamBuilder extends Serializable {
- public StreamBuilder setStreamsConfiguration(StreamsConfiguration configuration);
+ StreamBuilder setStreamsConfiguration(StreamsConfiguration configuration);
- public StreamsConfiguration getStreamsConfiguration();
+ StreamsConfiguration getStreamsConfiguration();
/**
* Add a {@link org.apache.streams.core.StreamsProcessor} to the data processing stream.
@@ -53,7 +53,7 @@ public interface StreamBuilder extends Serializable {
* receive data from.
* @return this
*/
- public StreamBuilder addStreamsProcessor(String processorId, StreamsProcessor processor, int numTasks, String... connectToIds);
+ StreamBuilder addStreamsProcessor(String processorId, StreamsProcessor processor, int numTasks, String... connectToIds);
/**
* Add a {@link org.apache.streams.core.StreamsPersistWriter} to the data processing stream.
@@ -64,7 +64,7 @@ public interface StreamBuilder extends Serializable {
* receive data from.
* @return this
*/
- public StreamBuilder addStreamsPersistWriter(String persistWriterId, StreamsPersistWriter writer, int numTasks, String... connectToIds);
+ StreamBuilder addStreamsPersistWriter(String persistWriterId, StreamsPersistWriter writer, int numTasks, String... connectToIds);
/**
* Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream. The provider will execute
@@ -73,7 +73,7 @@ public interface StreamBuilder extends Serializable {
* @param provider provider to execute
* @return this
*/
- public StreamBuilder newPerpetualStream(String streamId, StreamsProvider provider);
+ StreamBuilder newPerpetualStream(String streamId, StreamsProvider provider);
/**
* Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream. The provider will execute
@@ -82,7 +82,7 @@ public interface StreamBuilder extends Serializable {
* @param provider provider to execute
* @return this
*/
- public StreamBuilder newReadCurrentStream(String streamId, StreamsProvider provider);
+ StreamBuilder newReadCurrentStream(String streamId, StreamsProvider provider);
/**
* Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream. The provider will execute
@@ -92,7 +92,7 @@ public interface StreamBuilder extends Serializable {
* @param sequence sequence to pass to {@link org.apache.streams.core.StreamsProvider:readNext(BigInteger)} method
* @return this
*/
- public StreamBuilder newReadNewStream(String streamId, StreamsProvider provider, BigInteger sequence);
+ StreamBuilder newReadNewStream(String streamId, StreamsProvider provider, BigInteger sequence);
/**
* Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream. The provider will execute
@@ -104,25 +104,17 @@ public interface StreamBuilder extends Serializable {
* @param end end date
* @return this
*/
- public StreamBuilder newReadRangeStream(String streamId, StreamsProvider provider, DateTime start, DateTime end);
+ StreamBuilder newReadRangeStream(String streamId, StreamsProvider provider, DateTime start, DateTime end);
/**
* Builds the stream, and starts it or submits it based on implementation.
*/
- public void start();
+ void start();
/**
* Stops the streams processing. No guarantee on a smooth shutdown. Optional method, may not be implemented in
* all cases.
*/
- public void stop();
-
-
-
-
-
-
-
-
+ void stop();
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java b/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java
index 490f454..a86499b 100644
--- a/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java
@@ -28,19 +28,19 @@ public interface StreamsOperation extends Serializable {
/**
* Each operation must publish an identifier.
*/
- public String getId();
+ String getId();
/**
* This method will be called after initialization/serialization. Initialize any non-serializable objects here.
* @param configurationObject Any object to help intialize the operation. ie. Map, JobContext, Properties, etc. The type
* will be based on where the operation is being run (ie. hadoop, storm, locally, etc.)
*/
- public void prepare(Object configurationObject);
+ void prepare(Object configurationObject);
/**
* No guarantee that this method will ever be called. But upon shutdown of the stream, an attempt to call this method
* will be made.
* Use this method to terminate connections, etc.
*/
- public void cleanUp();
+ void cleanUp();
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
index a797ce5..5b85e68 100644
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
@@ -40,11 +40,11 @@ import org.apache.streams.pojo.json.ThroughputQueueBroadcast;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.common.collect.Lists;
import org.slf4j.Logger;
import java.lang.management.ManagementFactory;
import java.net.URI;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -128,7 +128,7 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
LOGGER.info("BroadcastMonitorThread running");
while (keepRunning) {
try {
- List<String> messages = Lists.newArrayList();
+ List<String> messages = new ArrayList<>();
Set<ObjectName> beans = server.queryNames(null, null);
for (ObjectName name : beans) {
@@ -185,13 +185,16 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple
}
if (broadcastUri != null) {
- if (broadcastUri.getScheme().equals("http")) {
- messagePersister = new BroadcastMessagePersister(broadcastUri.toString());
- } else if (broadcastUri.getScheme().equals("udp")) {
- messagePersister = new LogstashUdpMessagePersister(broadcastUri.toString());
- } else {
- LOGGER.error("You need to specify a broadcast URI with either a HTTP or UDP protocol defined.");
- throw new RuntimeException();
+ switch (broadcastUri.getScheme()) {
+ case "http":
+ messagePersister = new BroadcastMessagePersister(broadcastUri.toString());
+ break;
+ case "udp":
+ messagePersister = new LogstashUdpMessagePersister(broadcastUri.toString());
+ break;
+ default:
+ LOGGER.error("You need to specify a broadcast URI with either a HTTP or UDP protocol defined.");
+ throw new RuntimeException();
}
} else {
messagePersister = new Slf4jMessagePersister();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java b/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java
index 3f9a4c1..c85ff90 100644
--- a/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java
+++ b/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java
@@ -28,6 +28,7 @@ import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketException;
+import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
@@ -37,7 +38,7 @@ public class LogstashUdpMessagePersisterTest {
private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(LogstashUdpMessagePersisterTest.class);
- DatagramSocket socket = null;
+ private DatagramSocket socket = null;
/**
* setup.
@@ -56,7 +57,7 @@ public class LogstashUdpMessagePersisterTest {
public void testFailedPersist() {
LogstashUdpMessagePersister persister = new LogstashUdpMessagePersister("udp://127.0.0.1:56789");
- List<String> messageArray = Lists.newArrayList();
+ List<String> messageArray = new ArrayList<>();
for (int x = 0; x < 10; x ++) {
messageArray.add("Fake_message #" + x);
}