You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2020/08/12 16:02:51 UTC
[streams] 01/04: STREAMS-668 upgrade flink examples to latest flink
version
This is an automated email from the ASF dual-hosted git repository.
sblackmon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/streams.git
commit c8f58e2ad23c33092337357854443f1bfeef2606
Author: Steve Blackmon <st...@peoplepatternmacpro.lan>
AuthorDate: Tue Aug 4 14:25:50 2020 -0500
STREAMS-668 upgrade flink examples to latest flink version
STREAMS-668
upgrade flink examples to latest flink version 1.11.1
also improved threading throughout streams-provider-twitter
added accumulators to twitter flink pipelines
---
.../twitter/provider/TwitterFollowingProvider.java | 4 +-
.../twitter/provider/TwitterTimelineProvider.java | 31 +++--
.../provider/TwitterUserInformationProvider.java | 127 +++++++++++++--------
.../TwitterUserInformationProviderTask.java | 29 ++++-
.../flink-twitter-collection/pom.xml | 26 +++--
.../apache/streams/examples/flink/FlinkBase.scala | 13 ++-
.../collection/FlinkTwitterFollowingPipeline.scala | 103 ++++++++---------
.../collection/FlinkTwitterPostsPipeline.scala | 63 +++++++---
.../collection/FlinkTwitterSpritzerPipeline.scala | 58 ++++++----
.../FlinkTwitterUserInformationPipeline.scala | 62 +++++++---
.../FollowingCollectorFlatMapFunction.scala | 52 +++++++++
.../TimelineCollectorFlatMapFunction.scala | 52 +++++++++
.../UserInformationCollectorFlatMapFunction.scala | 48 ++++++++
.../FlinkTwitterFollowingPipelineFollowersIT.conf | 3 +-
.../FlinkTwitterFollowingPipelineFriendsIT.conf | 1 +
.../resources/FlinkTwitterPostsPipelineIT.conf | 1 +
.../FlinkTwitterUserInformationPipelineIT.conf | 9 +-
.../src/test/resources/testng.xml | 54 +++++++++
.../FlinkTwitterFollowingPipelineFollowersIT.scala | 2 +-
.../FlinkTwitterFollowingPipelineFriendsIT.scala | 4 +-
.../twitter/test/FlinkTwitterPostsPipelineIT.scala | 4 +-
.../FlinkTwitterUserInformationPipelineIT.scala | 4 +-
22 files changed, 562 insertions(+), 188 deletions(-)
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 4603803..eb472ea 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -332,6 +332,8 @@ public class TwitterFollowingProvider implements Callable<Iterator<Follow>>, Str
}
public boolean isRunning() {
+ LOGGER.debug("providerQueue.isEmpty: {}", providerQueue.isEmpty());
+ LOGGER.debug("providerQueue.size: {}", providerQueue.size());
LOGGER.debug("executor.isShutdown: {}", executor.isShutdown());
LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
LOGGER.debug("tasks.size(): {}", tasks.size());
@@ -346,7 +348,7 @@ public class TwitterFollowingProvider implements Callable<Iterator<Follow>>, Str
allTasksComplete = false;
}
LOGGER.debug("allTasksComplete: {}", allTasksComplete);
- boolean finished = allTasksComplete && tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated();
+ boolean finished = tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated() && allTasksComplete && providerQueue.size() == 0;
LOGGER.debug("finished: {}", finished);
if ( finished ) {
running.set(false);
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index eee81c4..126683c 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -206,6 +206,7 @@ public class TwitterTimelineProvider implements Callable<Iterator<Tweet>>, Strea
}
Objects.requireNonNull(providerQueue);
+ Objects.requireNonNull(config);
Objects.requireNonNull(config.getOauth().getConsumerKey());
Objects.requireNonNull(config.getOauth().getConsumerSecret());
Objects.requireNonNull(config.getOauth().getAccessToken());
@@ -244,6 +245,8 @@ public class TwitterTimelineProvider implements Callable<Iterator<Tweet>>, Strea
)
);
+ Objects.requireNonNull(executor);
+
submitTimelineThreads(ids, names);
LOGGER.info("tasks: {}", tasks.size());
@@ -323,6 +326,8 @@ public class TwitterTimelineProvider implements Callable<Iterator<Tweet>>, Strea
running.set(false);
LOGGER.info("Exiting");
+ } else {
+ LOGGER.info("Not Finished Yet...");
}
return result;
@@ -350,25 +355,37 @@ public class TwitterTimelineProvider implements Callable<Iterator<Tweet>>, Strea
}
@Override
- public void cleanUp() {
- ExecutorUtils.shutdownAndAwaitTermination(executor);
- }
-
- @Override
public boolean isRunning() {
LOGGER.debug("providerQueue.isEmpty: {}", providerQueue.isEmpty());
LOGGER.debug("providerQueue.size: {}", providerQueue.size());
+ LOGGER.debug("executor.isShutdown: {}", executor.isShutdown());
LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
LOGGER.debug("tasks.size(): {}", tasks.size());
LOGGER.debug("futures.size(): {}", futures.size());
- if ( tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated() ) {
+ boolean allTasksComplete;
+ if( futures.size() > 0) {
+ allTasksComplete = true;
+ for(Future<?> future : futures){
+ allTasksComplete |= !future.isDone(); // check if future is done
+ }
+ } else {
+ allTasksComplete = false;
+ }
+ LOGGER.debug("allTasksComplete: {}", allTasksComplete);
+ boolean finished = tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated() && allTasksComplete && providerQueue.size() == 0;
+ LOGGER.debug("finished: {}", finished);
+ if ( finished ) {
running.set(false);
}
- LOGGER.debug("isRunning: ", running.get());
return running.get();
}
@Override
+ public void cleanUp() {
+ ExecutorUtils.shutdownAndAwaitTermination(executor);
+ }
+
+ @Override
public Iterator<Tweet> call() {
prepare(config);
startStream();
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index 6ceeb88..dc0d050 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -40,6 +40,7 @@ import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
+
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.Config;
@@ -56,6 +57,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.io.Serializable;
+import java.lang.Runnable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
@@ -64,6 +66,8 @@ import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
@@ -103,14 +107,15 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
protected Twitter client;
- protected ListeningExecutorService executor;
+ protected ExecutorService executor;
protected DateTime start;
protected DateTime end;
protected final AtomicBoolean running = new AtomicBoolean();
- private List<ListenableFuture<Object>> futures = new ArrayList<>();
+ private List<Runnable> tasks = new ArrayList<>();
+ private List<Future<Object>> futures = new ArrayList<>();
/**
* To use from command line:
@@ -165,10 +170,11 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
System.err.println(ex.getMessage());
}
}
+ outStream.flush();
}
while ( provider.isRunning());
provider.cleanUp();
- outStream.flush();
+ outStream.close();
}
// TODO: this should be abstracted out
@@ -199,9 +205,19 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
public void prepare(Object configurationObject) {
if ( configurationObject instanceof TwitterFollowingConfiguration ) {
- config = (TwitterUserInformationConfiguration) configurationObject;
+ this.config = (TwitterUserInformationConfiguration) configurationObject;
}
+ streamsConfiguration = StreamsConfigurator.detectConfiguration(StreamsConfigurator.getConfig());
+
+ try {
+ lock.writeLock().lock();
+ providerQueue = QueueUtils.constructQueue();
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ Objects.requireNonNull(providerQueue);
Objects.requireNonNull(config);
Objects.requireNonNull(config.getOauth());
Objects.requireNonNull(config.getOauth().getConsumerKey());
@@ -211,10 +227,6 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
Objects.requireNonNull(config.getInfo());
Objects.requireNonNull(config.getThreadsPerProvider());
- streamsConfiguration = StreamsConfigurator.detectConfiguration();
-
- Objects.requireNonNull(streamsConfiguration.getQueueSize());
-
try {
client = getTwitterClient();
} catch (InstantiationException e) {
@@ -223,15 +235,6 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
Objects.requireNonNull(client);
- try {
- lock.writeLock().lock();
- providerQueue = QueueUtils.constructQueue();
- } finally {
- lock.writeLock().unlock();
- }
-
- Objects.requireNonNull(providerQueue);
-
for (String s : config.getInfo()) {
if (s != null) {
String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
@@ -247,7 +250,7 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
}
executor = MoreExecutors.listeningDecorator(
- ExecutorUtils.newFixedThreadPoolWithQueueSize(
+ ExecutorUtils.newFixedThreadPoolWithQueueSize(
config.getThreadsPerProvider().intValue(),
streamsConfiguration.getQueueSize().intValue()
)
@@ -255,12 +258,38 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
Objects.requireNonNull(executor);
- // Twitter allows for batches up to 100 per request, but you cannot mix types
submitUserInformationThreads(ids, names);
+
+ LOGGER.info("tasks: {}", tasks.size());
+ LOGGER.info("futures: {}", futures.size());
+
+ }
+
+ @Override
+ public void startStream() {
+
+ Objects.requireNonNull(executor);
+
+ LOGGER.info("startStream");
+
+ running.set(true);
+
+ LOGGER.info("running: {}", running.get());
+
+ ExecutorUtils.shutdownAndAwaitTermination(executor);
+
+ LOGGER.info("running: {}", running.get());
+
}
protected void submitUserInformationThreads(List<Long> ids, List<String> names) {
+ /*
+ while( idsIndex < ids.size() ) {
+ from = idsIndex
+ to = Math.min( idsIndex + 100, ids.size() - idsIndex
+ }
+ */
int idsIndex = 0;
while( idsIndex + 100 < ids.size() ) {
List<Long> batchIds = ids.subList(idsIndex, idsIndex + 100);
@@ -268,7 +297,8 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
this,
client,
new UsersLookupRequest().withUserId(batchIds));
- ListenableFuture future = executor.submit(providerTask);
+ tasks.add(providerTask);
+ Future future = executor.submit((Callable)providerTask);
futures.add(future);
LOGGER.info("Thread Submitted: {}", providerTask.request);
idsIndex += 100;
@@ -279,7 +309,8 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
this,
client,
new UsersLookupRequest().withUserId(batchIds));
- ListenableFuture future = executor.submit(providerTask);
+ tasks.add(providerTask);
+ Future future = executor.submit((Callable)providerTask);
futures.add(future);
LOGGER.info("Thread Submitted: {}", providerTask.request);
}
@@ -291,7 +322,8 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
this,
client,
new UsersLookupRequest().withScreenName(batchNames));
- ListenableFuture future = executor.submit(providerTask);
+ tasks.add(providerTask);
+ Future future = executor.submit((Callable)providerTask);
futures.add(future);
LOGGER.info("Thread Submitted: {}", providerTask.request);
namesIndex += 100;
@@ -302,7 +334,8 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
this,
client,
new UsersLookupRequest().withScreenName(batchNames));
- ListenableFuture future = executor.submit(providerTask);
+ tasks.add(providerTask);
+ Future future = executor.submit((Callable)providerTask);
futures.add(future);
LOGGER.info("Thread Submitted: {}", providerTask.request);
}
@@ -310,27 +343,16 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
}
@Override
- public void startStream() {
-
- Objects.requireNonNull(executor);
-
- LOGGER.info("startStream: {} Threads", futures.size());
-
- running.set(true);
-
- executor.shutdown();
- }
-
- @Override
public StreamsResultSet readCurrent() {
StreamsResultSet result;
+ LOGGER.debug("Providing {} docs", providerQueue.size());
+
try {
lock.writeLock().lock();
result = new StreamsResultSet(providerQueue);
providerQueue = QueueUtils.constructQueue();
- LOGGER.debug("readCurrent: {} Documents", result.size());
} finally {
lock.writeLock().unlock();
}
@@ -347,27 +369,38 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>,
@Override
public StreamsResultSet readRange(DateTime start, DateTime end) {
LOGGER.debug("{} readRange", STREAMS_ID);
- this.start = start;
- this.end = end;
- readCurrent();
- return (StreamsResultSet)providerQueue.iterator();
+ throw new NotImplementedException();
}
+ protected Twitter getTwitterClient() throws InstantiationException {
+ return Twitter.getInstance(config);
+ }
@Override
public boolean isRunning() {
- if ( providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone() ) {
- LOGGER.info("All Threads Completed");
+ LOGGER.debug("providerQueue.isEmpty: {}", providerQueue.isEmpty());
+ LOGGER.debug("providerQueue.size: {}", providerQueue.size());
+ LOGGER.debug("executor.isTerminated: {}", executor.isTerminated());
+ LOGGER.debug("tasks.size(): {}", tasks.size());
+ LOGGER.debug("futures.size(): {}", futures.size());
+ boolean allTasksComplete;
+ if( futures.size() > 0) {
+ allTasksComplete = true;
+ for(Future<?> future : futures){
+ allTasksComplete |= !future.isDone(); // check if future is done
+ }
+ } else {
+ allTasksComplete = false;
+ }
+ LOGGER.debug("allTasksComplete: {}", allTasksComplete);
+ boolean finished = allTasksComplete && tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated();
+ LOGGER.debug("finished: {}", finished);
+ if ( finished ) {
running.set(false);
- LOGGER.info("Exiting");
}
return running.get();
}
- protected Twitter getTwitterClient() throws InstantiationException {
- return Twitter.getInstance(config);
- }
-
@Override
public void cleanUp() {
ExecutorUtils.shutdownAndAwaitTermination(executor);
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java
index 418ba83..40ebfcf 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java
@@ -23,6 +23,7 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.api.Twitter;
import org.apache.streams.twitter.api.UsersLookupRequest;
import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Tweet;
import org.apache.streams.twitter.pojo.User;
import org.apache.streams.util.ComponentUtils;
@@ -31,14 +32,17 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Retrieve recent posts for a single user id.
*/
-public class TwitterUserInformationProviderTask implements Runnable {
+public class TwitterUserInformationProviderTask implements Callable<Iterator<User>>, Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProviderTask.class);
@@ -47,6 +51,7 @@ public class TwitterUserInformationProviderTask implements Runnable {
protected TwitterUserInformationProvider provider;
protected Twitter client;
protected UsersLookupRequest request;
+ protected List<User> responseList;
/**
* TwitterTimelineProviderTask constructor.
@@ -60,19 +65,37 @@ public class TwitterUserInformationProviderTask implements Runnable {
this.request = request;
}
+ int item_count = 0;
+
@Override
public void run() {
LOGGER.info("Thread Starting: {}", request.toString());
+ responseList = new ArrayList<>();
+
List<User> users = client.lookup(request);
- for (User user : users) {
- ComponentUtils.offerUntilSuccess(new StreamsDatum(user), provider.providerQueue);
+ responseList.addAll(users);
+
+ int item_count = 0;
+
+ if( users.size() > 0 ) {
+ for (User user : users) {
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(user), provider.providerQueue);
+ LOGGER.debug("User: {}", user.getIdStr());
+ }
}
LOGGER.info("Thread Finished: {}", request.toString());
+ LOGGER.info("item_count: {} ", item_count);
+
}
+ @Override
+ public Iterator<User> call() throws Exception {
+ run();
+ return responseList.iterator();
+ }
}
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml b/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml
index f39ae0a..1e2de10 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml
@@ -34,7 +34,7 @@
<properties>
<testng.version>6.9.10</testng.version>
<hdfs.version>2.7.0</hdfs.version>
- <flink.version>1.4.2</flink.version>
+ <flink.version>1.11.1</flink.version>
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<scalapc.version>1.1.0</scalapc.version>
@@ -127,6 +127,18 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-httpfs</artifactId>
+ <type>war</type>
+ <version>${hdfs.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
@@ -433,15 +445,9 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
- <!-- Run integration test suite rather than individual tests. -->
- <excludes>
- <exclude>**/*Test.java</exclude>
- <exclude>**/*Tests.java</exclude>
- </excludes>
- <includes>
- <include>**/*IT.java</include>
- <include>**/*ITs.java</include>
- </includes>
+ <suiteXmlFiles>
+ <suiteXmlFile>target/test-classes/testng.xml</suiteXmlFile>
+ </suiteXmlFiles>
</configuration>
<dependencies>
<dependency>
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
index 1709d4c..3acca05 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
@@ -66,7 +66,7 @@ trait FlinkBase {
if(StringUtils.isNotEmpty(configUrl)) {
BASELOGGER.info("StreamsConfigurator.resolveConfig(configUrl): {}", StreamsConfigurator.resolveConfig(configUrl))
try {
- typesafe = StreamsConfigurator.resolveConfig(configUrl).withFallback(StreamsConfigurator.getConfig).resolve()
+ typesafe = StreamsConfigurator.resolveConfig(configUrl)
} catch {
case mue: MalformedURLException => {
BASELOGGER.error("Invalid Configuration URL: ", mue)
@@ -77,6 +77,7 @@ trait FlinkBase {
return false
}
}
+ StreamsConfigurator.addConfig(typesafe)
}
else {
typesafe = StreamsConfigurator.getConfig
@@ -177,9 +178,10 @@ trait FlinkBase {
}
else if (configObject.getScheme.toString.equals("s3")) {
inPathBuilder = configObject.getScheme + "://" + configObject.getPath + "/" + configObject.getReaderPath
- } else {
- throw new Exception("scheme not recognized: " + configObject.getScheme)
}
+// else {
+// throw new Exception("scheme not recognized: " + configObject.getScheme)
+// }
inPathBuilder
}
@@ -193,9 +195,10 @@ trait FlinkBase {
}
else if( configObject.getScheme.toString.equals("s3")) {
outPathBuilder = configObject.getScheme + "://" + configObject.getPath + "/" + configObject.getWriterPath
- } else {
- throw new Exception("output scheme not recognized: " + configObject.getScheme)
}
+// else {
+// throw new Exception("output scheme not recognized: " + configObject.getScheme)
+// }
outPathBuilder
}
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index bbf83f8..bae988c 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -19,32 +19,28 @@
package org.apache.streams.examples.flink.twitter.collection
import java.util.Objects
-import java.util.concurrent.TimeUnit
import com.fasterxml.jackson.databind.ObjectMapper
-import com.google.common.util.concurrent.Uninterruptibles
import org.apache.commons.lang3.StringUtils
-import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
-import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
-import org.apache.flink.util.Collector
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.scala.KeyedStream
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.streams.config.StreamsConfigurator
import org.apache.streams.core.StreamsDatum
import org.apache.streams.examples.flink.FlinkBase
import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
-import org.apache.streams.flink.{FlinkStreamingConfiguration, StreamsFlinkConfiguration}
-import org.apache.streams.hdfs.{HdfsReaderConfiguration, HdfsWriterConfiguration}
import org.apache.streams.jackson.StreamsJacksonMapper
-import org.apache.streams.twitter.config.TwitterFollowingConfiguration
import org.apache.streams.twitter.pojo.Follow
-import org.apache.streams.twitter.provider.TwitterFollowingProvider
import org.hamcrest.MatcherAssert
-import org.slf4j.{Logger, LoggerFactory}
-
-import scala.collection.JavaConversions._
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
/**
* FlinkTwitterFollowingPipeline collects friends or followers of all profiles from a
@@ -58,8 +54,12 @@ object FlinkTwitterFollowingPipeline extends FlinkBase {
private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
override def main(args: Array[String]) = {
- super.main(args)
- val jobConfig = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
+ if( args.length > 0 ) {
+ LOGGER.info("Args: {}", args)
+ configUrl = args(0)
+ }
+ if( !setup(configUrl) ) System.exit(1)
+ val jobConfig = new StreamsConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectCustomConfiguration()
if( !setup(jobConfig) ) System.exit(1)
val pipeline: FlinkTwitterFollowingPipeline = new FlinkTwitterFollowingPipeline(jobConfig)
val thread = new Thread(pipeline)
@@ -119,6 +119,7 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio
val env: StreamExecutionEnvironment = streamEnvironment(config)
+ env.setParallelism(streamsConfig.getParallelism().toInt)
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
env.setNumberOfExecutionRetries(0)
@@ -126,54 +127,54 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio
val outPath = buildWriterPath(config.getDestination)
- val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).keyBy( id => (id.hashCode % 100).abs )
+ val ids: DataStream[String] = env.readTextFile(inPath)
+
+ val keyed_ids: KeyedStream[String, Int] = ids.
+ name("keyed_ids").
+ setParallelism(streamsConfig.getParallelism().toInt).
+ keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs )
// these datums contain 'Follow' objects
- val followDatums: DataStream[StreamsDatum] =
- keyed_ids.flatMap(new FollowingCollectorFlatMapFunction(config.getTwitter)).setParallelism(10)
+ val followDatums: DataStream[StreamsDatum] = keyed_ids.
+ flatMap(new FollowingCollectorFlatMapFunction(streamsConfig, config.getTwitter, streamsFlinkConfiguration)).
+ name("followDatums").
+ setParallelism(streamsConfig.getParallelism().toInt)
- val follows: DataStream[Follow] = followDatums
+ val follows: DataStream[Follow] = followDatums.
+ name("follows")
.map(datum => datum.getDocument.asInstanceOf[Follow])
- val jsons: DataStream[String] = follows
+ val jsons: DataStream[String] = follows.
+ name("jsons")
.map(follow => {
val MAPPER = StreamsJacksonMapper.getInstance
MAPPER.writeValueAsString(follow)
- })
-
- if( config.getTest == false )
- jsons.addSink(new BucketingSink[String](outPath)).setParallelism(3)
- else
- jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
- .setParallelism(env.getParallelism)
+ }).
+ setParallelism(streamsConfig.getParallelism().toInt)
+
+ val keyed_jsons: KeyedStream[String, Int] = jsons.
+ name("keyed_jsons").
+ setParallelism(streamsConfig.getParallelism().toInt).
+ keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs )
+
+ val fileSink : StreamingFileSink[String] = StreamingFileSink.
+ forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")).
+ build()
+
+ if( config.getTest == true ) {
+ keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+ } else {
+ keyed_jsons.name("fileSink").addSink(fileSink)
+ }
- // if( test == true ) jsons.print();
+ val result: JobExecutionResult = env.execute("FlinkTwitterFollowingPipeline")
- env.execute(STREAMS_ID)
- }
+ LOGGER.info("JobExecutionResult: {}", result.getJobExecutionResult)
- class FollowingCollectorFlatMapFunction(
- twitterConfiguration : TwitterFollowingConfiguration = new ComponentConfigurator(classOf[TwitterFollowingConfiguration]).detectConfiguration(),
- flinkConfiguration : StreamsFlinkConfiguration = new ComponentConfigurator(classOf[StreamsFlinkConfiguration]).detectConfiguration()
- ) extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
+ LOGGER.info("JobExecutionResult.getNetRuntime: {}", result.getNetRuntime())
- override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
- collectConnections(input, out)
- }
+ LOGGER.info("JobExecutionResult.getAllAccumulatorResults: {}", MAPPER.writeValueAsString(result.getAllAccumulatorResults()))
- def collectConnections(id : String, out : Collector[StreamsDatum]) = {
- val twitProvider: TwitterFollowingProvider =
- new TwitterFollowingProvider(
- twitterConfiguration.withInfo(List(toProviderId(id))).asInstanceOf[TwitterFollowingConfiguration]
- )
- twitProvider.prepare(twitProvider)
- twitProvider.startStream()
- var iterator: Iterator[StreamsDatum] = null
- do {
- Uninterruptibles.sleepUninterruptibly(flinkConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS)
- twitProvider.readCurrent().iterator().toList.map(out.collect(_))
- } while( twitProvider.isRunning )
- }
}
}
\ No newline at end of file
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index 28d2576..14eebc5 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -24,24 +24,31 @@ import java.util.concurrent.TimeUnit
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.util.concurrent.Uninterruptibles
import org.apache.commons.lang3.StringUtils
+import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
-import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.scala.KeyedStream
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.config.ComponentConfigurator
+import org.apache.streams.config.StreamsConfigurator
import org.apache.streams.core.StreamsDatum
import org.apache.streams.examples.flink.FlinkBase
import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration
-import org.apache.streams.flink.FlinkStreamingConfiguration
-import org.apache.streams.hdfs.{HdfsReaderConfiguration, HdfsWriterConfiguration}
+import org.apache.streams.hdfs.HdfsReaderConfiguration
+import org.apache.streams.hdfs.HdfsWriterConfiguration
import org.apache.streams.jackson.StreamsJacksonMapper
import org.apache.streams.twitter.pojo.Tweet
import org.apache.streams.twitter.provider.TwitterTimelineProvider
import org.hamcrest.MatcherAssert
-import org.slf4j.{Logger, LoggerFactory}
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
import scala.collection.JavaConversions._
@@ -57,8 +64,12 @@ object FlinkTwitterPostsPipeline extends FlinkBase {
private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
override def main(args: Array[String]) = {
- super.main(args)
- val jobConfig = new ComponentConfigurator[TwitterPostsPipelineConfiguration](classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(typesafe)
+ if( args.length > 0 ) {
+ LOGGER.info("Args: {}", args)
+ configUrl = args(0)
+ }
+ if( !setup(configUrl) ) System.exit(1)
+ val jobConfig = new StreamsConfigurator(classOf[TwitterPostsPipelineConfiguration]).detectCustomConfiguration()
if( !setup(jobConfig) ) System.exit(1)
val pipeline: FlinkTwitterPostsPipeline = new FlinkTwitterPostsPipeline(jobConfig)
val thread = new Thread(pipeline)
@@ -118,6 +129,7 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
val env: StreamExecutionEnvironment = streamEnvironment(config)
+ env.setParallelism(streamsConfig.getParallelism().toInt)
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
env.setNumberOfExecutionRetries(0)
@@ -125,13 +137,15 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
val outPath = buildWriterPath(new ComponentConfigurator(classOf[HdfsWriterConfiguration]).detectConfiguration())
- val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(10).name("ids")
+ val ids: DataStream[String] = env.readTextFile(inPath).name("ids")
- val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).name("keyed_ids").keyBy( id => (id.hashCode % 100).abs )
+ val keyed_ids: KeyedStream[String, Int] = ids.name("keyed_ids").
+ setParallelism(streamsConfig.getParallelism().toInt).
+ keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs )
// these datums contain 'Tweet' objects
val tweetDatums: DataStream[StreamsDatum] =
- keyed_ids.flatMap(new postCollectorFlatMapFunction).setParallelism(10).name("tweetDatums")
+ keyed_ids.flatMap(new postCollectorFlatMapFunction).setParallelism(env.getParallelism).name("tweetDatums")
val tweets: DataStream[Tweet] = tweetDatums
.map(datum => datum.getDocument.asInstanceOf[Tweet]).name("tweets")
@@ -142,15 +156,28 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
MAPPER.writeValueAsString(tweet)
}).name("json")
- if( config.getTest == false )
- jsons.addSink(new BucketingSink[String](outPath)).setParallelism(3).name("hdfs")
- else
- jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
- .setParallelism(env.getParallelism)
+ val keyed_jsons: KeyedStream[String, Int] = jsons.
+ setParallelism(streamsConfig.getParallelism().toInt).
+ keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs )
+
+ val fileSink : StreamingFileSink[String] = StreamingFileSink.
+ forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")).
+ build()
+
+ if( config.getTest == true ) {
+ keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+ } else {
+ keyed_jsons.addSink(fileSink)
+ }
+
+ val result: JobExecutionResult = env.execute("FlinkTwitterPostsPipeline")
+
+ LOGGER.info("JobExecutionResult: {}", result.getJobExecutionResult)
+
+ LOGGER.info("JobExecutionResult.getNetRuntime: {}", result.getNetRuntime())
- // if( test == true ) jsons.print();
+ LOGGER.info("JobExecutionResult.getAllAccumulatorResults: {}", MAPPER.writeValueAsString(result.getAllAccumulatorResults()))
- env.execute(STREAMS_ID)
}
class postCollectorFlatMapFunction extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
index 79c95d6..6e38591 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
@@ -25,26 +25,32 @@ import java.util.concurrent.TimeUnit
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.util.concurrent.Uninterruptibles
import org.apache.commons.lang3.StringUtils
-import org.apache.flink.api.common.functions.StoppableFunction
+import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.scala.KeyedStream
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.streams.config.ComponentConfigurator
+import org.apache.streams.config.StreamsConfigurator
import org.apache.streams.core.StreamsDatum
import org.apache.streams.examples.flink.FlinkBase
import org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration
-import org.apache.streams.flink.FlinkStreamingConfiguration
import org.apache.streams.hdfs.HdfsWriterConfiguration
import org.apache.streams.jackson.StreamsJacksonMapper
import org.apache.streams.twitter.config.TwitterStreamConfiguration
import org.apache.streams.twitter.converter.TwitterDateTimeFormat
import org.apache.streams.twitter.provider.TwitterStreamProvider
import org.hamcrest.MatcherAssert
-import org.slf4j.{Logger, LoggerFactory}
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
import scala.collection.JavaConversions._
@@ -122,25 +128,37 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration
val outPath = buildWriterPath(new ComponentConfigurator(classOf[HdfsWriterConfiguration]).detectConfiguration())
- val streamSource : DataStream[String] = env.addSource(spritzerSource)
+ val jsons : DataStream[String] = env.addSource(spritzerSource)
- if( config.getTest == false )
- streamSource.addSink(new BucketingSink[String](outPath)).setParallelism(3).name("hdfs")
- else
- streamSource.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
- .setParallelism(env.getParallelism)
+ val keyed_jsons: KeyedStream[String, Int] = jsons.
+ setParallelism(streamsConfig.getParallelism().toInt).
+ keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs )
- // if( test == true ) jsons.print();
+ val fileSink : StreamingFileSink[String] = StreamingFileSink.
+ forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")).
+ build()
- env.execute(STREAMS_ID)
+ if( config.getTest == true ) {
+ keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+ } else {
+ keyed_jsons.addSink(fileSink)
+ }
+
+ val result: JobExecutionResult = env.execute("FlinkTwitterPostsPipeline")
+
+ LOGGER.info("JobExecutionResult: {}", result.getJobExecutionResult)
+
+ LOGGER.info("JobExecutionResult.getNetRuntime: {}", result.getNetRuntime())
+
+ LOGGER.info("JobExecutionResult.getAllAccumulatorResults: {}", MAPPER.writeValueAsString(result.getAllAccumulatorResults()))
}
def stop(): Unit = {
- spritzerSource.stop()
+ spritzerSource.cancel()
}
- class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends RichSourceFunction[String] with Serializable with StoppableFunction {
+ class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends RichSourceFunction[String] with Serializable /*with StoppableFunction*/ {
var mapper: ObjectMapper = _
@@ -167,9 +185,9 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration
close()
}
- override def stop(): Unit = {
- close()
- }
+// override def stop(): Unit = {
+// close()
+// }
}
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index 977fdf5..32cf232 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -24,26 +24,34 @@ import java.util.concurrent.TimeUnit
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.util.concurrent.Uninterruptibles
import org.apache.commons.lang3.StringUtils
+import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.scala.KeyedStream
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.WindowedStream
import org.apache.flink.streaming.api.scala.function.WindowFunction
-import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
-import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
import org.apache.flink.util.Collector
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.config.ComponentConfigurator
+import org.apache.streams.config.StreamsConfigurator
import org.apache.streams.core.StreamsDatum
import org.apache.streams.examples.flink.FlinkBase
import org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration
-import org.apache.streams.flink.FlinkStreamingConfiguration
-import org.apache.streams.hdfs.{HdfsReaderConfiguration, HdfsWriterConfiguration}
+import org.apache.streams.hdfs.HdfsReaderConfiguration
+import org.apache.streams.hdfs.HdfsWriterConfiguration
import org.apache.streams.jackson.StreamsJacksonMapper
import org.apache.streams.twitter.pojo.User
import org.apache.streams.twitter.provider.TwitterUserInformationProvider
import org.hamcrest.MatcherAssert
-import org.slf4j.{Logger, LoggerFactory}
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
import scala.collection.JavaConversions._
@@ -59,8 +67,12 @@ object FlinkTwitterUserInformationPipeline extends FlinkBase {
private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
override def main(args: Array[String]) = {
- super.main(args)
- val jobConfig = new ComponentConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(typesafe)
+ if( args.length > 0 ) {
+ LOGGER.info("Args: {}", args)
+ configUrl = args(0)
+ }
+ if( !setup(configUrl) ) System.exit(1)
+ val jobConfig = new StreamsConfigurator(classOf[TwitterUserInformationPipelineConfiguration]).detectCustomConfiguration()
if( !setup(jobConfig) ) System.exit(1)
val pipeline: FlinkTwitterUserInformationPipeline = new FlinkTwitterUserInformationPipeline(jobConfig)
val thread = new Thread(pipeline)
@@ -115,11 +127,13 @@ object FlinkTwitterUserInformationPipeline extends FlinkBase {
class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipelineConfiguration = new StreamsConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectCustomConfiguration()) extends Runnable with java.io.Serializable {
import FlinkTwitterUserInformationPipeline._
+ //import FlinkBase.streamsConfig
override def run(): Unit = {
val env: StreamExecutionEnvironment = streamEnvironment(config)
+ env.setParallelism(streamsConfig.getParallelism().toInt)
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
env.setNumberOfExecutionRetries(0)
@@ -127,7 +141,7 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
val outPath = buildWriterPath(new ComponentConfigurator(classOf[HdfsWriterConfiguration]).detectConfiguration())
- val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(10).name("ids")
+ val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(env.getParallelism).name("ids")
val keyed_ids: KeyedStream[String, Int] = ids.name("keyed_ids").keyBy( id => (id.hashCode % 100).abs )
@@ -135,7 +149,7 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
val idLists: DataStream[List[String]] = idWindows.apply[List[String]] (new idListWindowFunction()).name("idLists")
- val userDatums: DataStream[StreamsDatum] = idLists.flatMap(new profileCollectorFlatMapFunction).setParallelism(10).name("userDatums")
+ val userDatums: DataStream[StreamsDatum] = idLists.flatMap(new profileCollectorFlatMapFunction).setParallelism(env.getParallelism).name("userDatums")
val user: DataStream[User] = userDatums.map(datum => datum.getDocument.asInstanceOf[User]).name("users")
@@ -145,15 +159,29 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
MAPPER.writeValueAsString(user)
}).name("jsons")
- if( config.getTest == false )
- jsons.addSink(new BucketingSink[String](outPath)).setParallelism(3).name("hdfs")
- else
- jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
- .setParallelism(env.getParallelism)
+ val keyed_jsons: KeyedStream[String, Int] = jsons.
+ setParallelism(streamsConfig.getParallelism().toInt).
+ keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs )
+
+ val fileSink : StreamingFileSink[String] = StreamingFileSink.
+ forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")).
+ build()
+
+ if( config.getTest == true ) {
+ keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+ } else {
+ keyed_jsons.addSink(fileSink)
+ }
+
+ val result: JobExecutionResult = env.execute("FlinkTwitterUserInformationPipeline")
+
+ LOGGER.info("JobExecutionResult: {}", result.getJobExecutionResult)
+
+ LOGGER.info("JobExecutionResult.getNetRuntime: {}", result.getNetRuntime())
+
+ LOGGER.info("JobExecutionResult.getAllAccumulatorResults: {}", MAPPER.writeValueAsString(result.getAllAccumulatorResults()))
- LOGGER.info("StreamExecutionEnvironment: {}", env.toString )
- env.execute(STREAMS_ID)
}
class idListWindowFunction extends WindowFunction[String, List[String], Int, GlobalWindow] {
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala
new file mode 100644
index 0000000..1223047
--- /dev/null
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala
@@ -0,0 +1,52 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.util.concurrent.TimeUnit
+
+import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.flink.api.common.accumulators.IntCounter
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.streams.config.ComponentConfigurator
+import org.apache.streams.config.StreamsConfiguration
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline.toProviderId
+import org.apache.streams.flink.StreamsFlinkConfiguration
+import org.apache.streams.twitter.config.TwitterFollowingConfiguration
+import org.apache.streams.twitter.provider.TwitterFollowingProvider
+
+import scala.collection.JavaConversions._
+
+class FollowingCollectorFlatMapFunction(
+ streamsConfiguration : StreamsConfiguration,
+ twitterConfiguration : TwitterFollowingConfiguration = new ComponentConfigurator(classOf[TwitterFollowingConfiguration]).detectConfiguration(),
+ flinkConfiguration : StreamsFlinkConfiguration = new ComponentConfigurator(classOf[StreamsFlinkConfiguration]).detectConfiguration()
+ ) extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
+
+ var size : IntCounter = new IntCounter()
+ var counter : IntCounter = new IntCounter()
+
+ override def open(parameters: Configuration): Unit = {
+ getRuntimeContext().addAccumulator("FlinkTwitterFollowingPipeline.size", this.size)
+ getRuntimeContext().addAccumulator("FlinkTwitterFollowingPipeline.counter", this.counter)
+ }
+
+ override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
+ size.add(input.size)
+ collectConnections(input, out)
+ }
+
+ def collectConnections(id : String, out : Collector[StreamsDatum]) = {
+ val conf = twitterConfiguration.withInfo(List(toProviderId(id))).asInstanceOf[TwitterFollowingConfiguration]
+ val twitProvider: TwitterFollowingProvider = new TwitterFollowingProvider(conf)
+ twitProvider.prepare(twitProvider)
+ twitProvider.startStream()
+ do {
+ Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS)
+ val current = twitProvider.readCurrent().iterator().toList
+ counter.add(current.size)
+ current.map(out.collect(_))
+ } while( twitProvider.isRunning )
+ }
+
+}
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala
new file mode 100644
index 0000000..8f21145
--- /dev/null
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala
@@ -0,0 +1,52 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.util.concurrent.TimeUnit
+
+import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.flink.api.common.accumulators.IntCounter
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.streams.config.StreamsConfiguration
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.flink.StreamsFlinkConfiguration
+import org.apache.streams.twitter.config.TwitterTimelineProviderConfiguration
+import org.apache.streams.twitter.provider.TwitterTimelineProvider
+
+import scala.collection.JavaConversions._
+
+/**
+ * Created by sblackmon on 6/2/16.
+ */
+class TimelineCollectorFlatMapFunction(
+ streamsConfiguration : StreamsConfiguration,
+ twitterConfiguration : TwitterTimelineProviderConfiguration,
+ streamsFlinkConfiguration : StreamsFlinkConfiguration
+ ) extends RichFlatMapFunction[List[String], StreamsDatum] with Serializable {
+ var size : IntCounter = new IntCounter()
+ var counter : IntCounter = new IntCounter()
+ override def open(parameters: Configuration): Unit = {
+ getRuntimeContext().addAccumulator("TimelineCollectorFlatMapFunction.size", this.size)
+ getRuntimeContext().addAccumulator("TimelineCollectorFlatMapFunction.counter", this.counter)
+ }
+ override def flatMap(input: List[String], out: Collector[StreamsDatum]): Unit = {
+ size.add(input.size)
+ collectPosts(input, out)
+ }
+ def collectPosts(ids : List[String], out : Collector[StreamsDatum]) = {
+ try {
+ val conf = twitterConfiguration.withInfo(ids).asInstanceOf[TwitterTimelineProviderConfiguration]
+ val twitProvider: TwitterTimelineProvider = new TwitterTimelineProvider(conf)
+ twitProvider.prepare(conf)
+ twitProvider.startStream()
+ do {
+ Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS)
+ val current = twitProvider.readCurrent().iterator().toList
+ counter.add(current.size)
+ current.map(out.collect(_))
+ } while( twitProvider.isRunning )
+ } finally {
+
+ }
+ }
+}
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/UserInformationCollectorFlatMapFunction.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/UserInformationCollectorFlatMapFunction.scala
new file mode 100644
index 0000000..46e0d4a
--- /dev/null
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/UserInformationCollectorFlatMapFunction.scala
@@ -0,0 +1,48 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.util.concurrent.TimeUnit
+
+import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.flink.api.common.accumulators.IntCounter
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.apache.streams.config.StreamsConfiguration
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.flink.StreamsFlinkConfiguration
+import org.apache.streams.twitter.config.TwitterUserInformationConfiguration
+import org.apache.streams.twitter.provider.TwitterUserInformationProvider
+
+import scala.collection.JavaConversions._
+
+/**
+ * Created by sblackmon on 6/2/16.
+ */
+class UserInformationCollectorFlatMapFunction(
+ streamsConfiguration : StreamsConfiguration,
+ twitterConfiguration : TwitterUserInformationConfiguration,
+ streamsFlinkConfiguration : StreamsFlinkConfiguration
+ ) extends RichFlatMapFunction[List[String], StreamsDatum] with Serializable {
+ var size : IntCounter = new IntCounter()
+ var counter : IntCounter = new IntCounter()
+ override def open(parameters: Configuration): Unit = {
+ getRuntimeContext().addAccumulator("UserInformationCollectorFlatMapFunction.size", this.size)
+ getRuntimeContext().addAccumulator("UserInformationCollectorFlatMapFunction.counter", this.counter)
+ }
+ override def flatMap(input: List[String], out: Collector[StreamsDatum]): Unit = {
+ size.add(input.size)
+ collectProfiles(input, out)
+ }
+ def collectProfiles(ids : List[String], out : Collector[StreamsDatum]) = {
+ val conf = twitterConfiguration.withInfo(ids)
+ val twitProvider: TwitterUserInformationProvider = new TwitterUserInformationProvider(conf)
+ twitProvider.prepare(conf)
+ twitProvider.startStream()
+ do {
+ Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS)
+ val current = twitProvider.readCurrent().iterator().toList
+ counter.add(current.size)
+ current.map(out.collect(_))
+ } while( twitProvider.isRunning )
+ }
+}
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
index 67f7c9c..ec4dc4e 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
@@ -32,9 +32,10 @@ org.apache.streams.twitter.config.TwitterFollowingConfiguration {
max_items = 5000
}
org.apache.streams.config.StreamsConfiguration {
+ parallelism = 1
providerWaitMs = 1000
}
org.apache.streams.flink.StreamsFlinkConfiguration {
local = true
test = true
-}
\ No newline at end of file
+}
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
index 724cd43..fd4b1db 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
@@ -31,6 +31,7 @@ org.apache.streams.twitter.config.TwitterFollowingConfiguration {
ids_only = true
}
org.apache.streams.config.StreamsConfiguration {
+ parallelism = 1
providerWaitMs = 1000
}
org.apache.streams.flink.StreamsFlinkConfiguration {
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf
index 626d8f6..8f5aadf 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf
@@ -27,6 +27,7 @@ org.apache.streams.hdfs.HdfsWriterConfiguration {
writerPath = "FlinkTwitterPostsPipelineIT"
}
org.apache.streams.config.StreamsConfiguration {
+ parallelism = 1
providerWaitMs = 1000
}
org.apache.streams.flink.StreamsFlinkConfiguration {
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
index ea8125a..cbdb650 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
@@ -28,8 +28,15 @@ org.apache.streams.hdfs.HdfsWriterConfiguration {
}
org.apache.streams.config.StreamsConfiguration {
providerWaitMs = 1000
+ queueSize = 10000
+ batchSize = 1000
+ identifier = "FlinkTwitterUserInformationPipelineIT"
+ parallelism = 1
+ providerTimeoutMs = 60000
+ shutdownCheckDelay = 30000
+ shutdownCheckInterval = 30000
}
org.apache.streams.flink.StreamsFlinkConfiguration {
local = true
test = true
-}
\ No newline at end of file
+}
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml
new file mode 100644
index 0000000..bdb250d
--- /dev/null
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
+
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<suite name="ExampleFlinkITs" preserve-order="true">
+
+ <test name="FlinkTwitterUserInformationPipelineIT">
+ <classes>
+ <class name="org.apache.streams.examples.flink.twitter.test.FlinkTwitterUserInformationPipelineIT" />
+ </classes>
+ </test>
+
+ <test name="FlinkTwitterPostsPipelineIT">
+ <classes>
+ <class name="org.apache.streams.examples.flink.twitter.test.FlinkTwitterPostsPipelineIT" />
+ </classes>
+ </test>
+
+ <test name="FlinkTwitterFollowingPipelineFriendsIT">
+ <classes>
+ <class name="org.apache.streams.examples.flink.twitter.test.FlinkTwitterFollowingPipelineFriendsIT" />
+ </classes>
+ </test>
+
+ <test name="FlinkTwitterFollowingPipelineFollowersIT">
+ <classes>
+ <class name="org.apache.streams.examples.flink.twitter.test.FlinkTwitterFollowingPipelineFollowersIT" />
+ </classes>
+ </test>
+
+ <test name="FlinkTwitterSpritzerPipelineIT">
+ <classes>
+ <class name="org.apache.streams.examples.flink.twitter.test.FlinkTwitterSpritzerPipelineIT" />
+ </classes>
+ </test>
+
+</suite>
\ No newline at end of file
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
index 617815e..bfc6940 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
@@ -36,7 +36,7 @@ import scala.io.Source
/**
* FlinkTwitterFollowingPipelineFollowersIT is an integration test for FlinkTwitterFollowingPipeline.
*/
-class FlinkTwitterFollowingPipelineFollowersIT extends FlatSpec {
+class FlinkTwitterFollowingPipelineFollowersIT {
private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineFollowersIT])
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
index 5ebea94..6d52d44 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
@@ -16,7 +16,7 @@
* under the License.
*/
-package com.peoplepattern.streams.twitter.collection
+package org.apache.streams.examples.flink.twitter.test
import java.io.File
import java.nio.file.{Files, Paths}
@@ -36,7 +36,7 @@ import scala.io.Source
/**
* FlinkTwitterFollowingPipelineFriendsIT is an integration test for FlinkTwitterFollowingPipeline.
*/
-class FlinkTwitterFollowingPipelineFriendsIT extends FlatSpec {
+class FlinkTwitterFollowingPipelineFriendsIT {
private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineFriendsIT])
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
index ed3edf3..a43c758 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
@@ -16,7 +16,7 @@
* under the License.
*/
-package com.peoplepattern.streams.twitter.collection
+package org.apache.streams.examples.flink.twitter.test
import java.io.File
import java.nio.file.{Files, Paths}
@@ -36,7 +36,7 @@ import scala.io.Source
/**
* FlinkTwitterPostsPipelineIT is an integration test for FlinkTwitterPostsPipeline.
*/
-class FlinkTwitterPostsPipelineIT extends FlatSpec {
+class FlinkTwitterPostsPipelineIT {
private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterPostsPipelineIT])
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
index 0652bc1..48b876a 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
@@ -16,7 +16,7 @@
* under the License.
*/
-package com.peoplepattern.streams.twitter.collection
+package org.apache.streams.examples.flink.twitter.test
import java.io.File
import java.nio.file.{Files, Paths}
@@ -36,7 +36,7 @@ import scala.io.Source
/**
* FlinkTwitterUserInformationPipelineIT is an integration test for FlinkTwitterUserInformationPipeline.
*/
-class FlinkTwitterUserInformationPipelineIT extends FlatSpec {
+class FlinkTwitterUserInformationPipelineIT {
private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipelineIT])