You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/12/04 13:29:27 UTC
[1/2] flink git commit: [FLINK-8167] [connector-wikiedits] Harden
WikipediaEditsSource
Repository: flink
Updated Branches:
refs/heads/master 80348d653 -> e7ca1055a
[FLINK-8167] [connector-wikiedits] Harden WikipediaEditsSource
- Minor eager sanity checks
- Use UUID suffix for nickname. As reported in FLINK-8167, the current
nickname suffix can result in nickname clashes which lead to test
failures.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ec0c416a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ec0c416a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ec0c416a
Branch: refs/heads/master
Commit: ec0c416a69398c9ef3bfd2be51d0dc6677e19f7b
Parents: 80348d6
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Nov 29 16:28:18 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Dec 4 14:28:51 2017 +0100
----------------------------------------------------------------------
.../wikiedits/WikipediaEditEventIrcStream.java | 44 +++++++++----------
.../wikiedits/WikipediaEditsSource.java | 46 ++++++++------------
2 files changed, 41 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ec0c416a/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditEventIrcStream.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditEventIrcStream.java b/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditEventIrcStream.java
index 81dfa7b..84d89dd 100644
--- a/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditEventIrcStream.java
+++ b/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditEventIrcStream.java
@@ -26,10 +26,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Objects;
+import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-class WikipediaEditEventIrcStream {
+class WikipediaEditEventIrcStream implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(WikipediaEditEventIrcStream.class);
@@ -37,11 +39,11 @@ class WikipediaEditEventIrcStream {
private final BlockingQueue<WikipediaEditEvent> edits =
new ArrayBlockingQueue<>(128);
- /** IRC connection (Thread). */
+ /** IRC connection (NOTE: this is a separate Thread). */
private IRCConnection conn;
WikipediaEditEventIrcStream(String host, int port) {
- final String nick = "flink-bot-" + (int) (Math.random() * 1000);
+ final String nick = "flink-bot-" + UUID.randomUUID().toString();
this.conn = new IRCConnection(host, new int[] { port}, "", nick, nick, nick);
conn.addIRCEventListener(new WikipediaIrcChannelListener(edits));
conn.setEncoding("UTF-8");
@@ -51,25 +53,18 @@ class WikipediaEditEventIrcStream {
conn.setName("WikipediaEditEventIrcStreamThread");
}
- void start() throws IOException {
- if (!conn.isConnected()) {
- conn.connect();
- }
+ BlockingQueue<WikipediaEditEvent> getEdits() {
+ return edits;
}
- void stop() throws InterruptedException {
- if (conn.isConnected()) {
+ void connect() throws IOException {
+ if (!conn.isConnected()) {
+ conn.connect();
}
-
- conn.interrupt();
- conn.join(5 * 1000);
- }
-
- BlockingQueue<WikipediaEditEvent> getEdits() {
- return edits;
}
void join(String channel) {
+ Objects.requireNonNull(channel, "channel");
conn.send("JOIN " + channel);
}
@@ -77,6 +72,15 @@ class WikipediaEditEventIrcStream {
conn.send("PART " + channel);
}
+ @Override
+ public void close() throws Exception {
+ if (conn != null && conn.isConnected()) {
+ conn.doQuit();
+ conn.close();
+ conn.join(5 * 1000);
+ }
+ }
+
// ------------------------------------------------------------------------
// IRC channel listener
// ------------------------------------------------------------------------
@@ -85,12 +89,8 @@ class WikipediaEditEventIrcStream {
private final BlockingQueue<WikipediaEditEvent> edits;
- public WikipediaIrcChannelListener(BlockingQueue<WikipediaEditEvent> edits) {
- if (edits == null) {
- throw new NullPointerException();
- }
-
- this.edits = edits;
+ WikipediaIrcChannelListener(BlockingQueue<WikipediaEditEvent> edits) {
+ this.edits = Objects.requireNonNull(edits, "edits");
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ec0c416a/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSource.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSource.java b/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSource.java
index a10abdb..4498beb 100644
--- a/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSource.java
+++ b/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSource.java
@@ -18,9 +18,9 @@
package org.apache.flink.streaming.connectors.wikiedits;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
@@ -44,8 +44,6 @@ public class WikipediaEditsSource extends RichSourceFunction<WikipediaEditEvent>
private volatile boolean isRunning = true;
- private WikipediaEditEventIrcStream ircStream;
-
/**
* Creates a source reading {@link WikipediaEditEvent} instances from the
* IRC channel <code>#en.wikipedia</code>.
@@ -72,33 +70,27 @@ public class WikipediaEditsSource extends RichSourceFunction<WikipediaEditEvent>
public WikipediaEditsSource(String host, int port, String channel) {
this.host = host;
this.port = port;
- this.channel = channel;
+ this.channel = Objects.requireNonNull(channel);
}
@Override
- public void open(Configuration parameters) throws Exception {
- ircStream = new WikipediaEditEventIrcStream(host, port);
- ircStream.start();
- ircStream.join(channel);
- }
-
- @Override
- public void close() throws Exception {
- if (ircStream != null) {
- ircStream.leave(channel);
- ircStream.stop();
- }
- }
-
- @Override
- public void run(SourceContext ctx) throws Exception {
- while (isRunning) {
- // Query for the next edit event
- WikipediaEditEvent edit = ircStream.getEdits()
- .poll(100, TimeUnit.MILLISECONDS);
-
- if (edit != null) {
- ctx.collect(edit);
+ public void run(SourceContext<WikipediaEditEvent> ctx) throws Exception {
+ try (WikipediaEditEventIrcStream ircStream = new WikipediaEditEventIrcStream(host, port)) {
+ // Open connection and join channel
+ ircStream.connect();
+ ircStream.join(channel);
+
+ try {
+ while (isRunning) {
+ // Query for the next edit event
+ WikipediaEditEvent edit = ircStream.getEdits().poll(100, TimeUnit.MILLISECONDS);
+
+ if (edit != null) {
+ ctx.collect(edit);
+ }
+ }
+ } finally {
+ ircStream.leave(channel);
}
}
}
[2/2] flink git commit: [FLINK-7762] [connector-wikiedits] Make
WikipediaEditsSourceTest proper test
Posted by ch...@apache.org.
[FLINK-7762] [connector-wikiedits] Make WikipediaEditsSourceTest proper test
The WikipediaEditsSourceTest unnecessarily implements an integration
test that starts a FlinkMiniCluster and executes a small Flink program.
This simply creates a source and executes run in a separate thread until
a single WikipediaEditEvent is received.
This closes #5102.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7ca1055
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7ca1055
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7ca1055
Branch: refs/heads/master
Commit: e7ca1055afc46e4a6ccb3492758de36bfd6419a3
Parents: ec0c416
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Nov 29 16:36:29 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Dec 4 14:29:01 2017 +0100
----------------------------------------------------------------------
flink-contrib/flink-connector-wikiedits/pom.xml | 7 +-
.../wikiedits/WikipediaEditsSourceTest.java | 172 +++++++++++++------
2 files changed, 125 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e7ca1055/flink-contrib/flink-connector-wikiedits/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-connector-wikiedits/pom.xml b/flink-contrib/flink-connector-wikiedits/pom.xml
index c13f93e..3908658 100644
--- a/flink-contrib/flink-connector-wikiedits/pom.xml
+++ b/flink-contrib/flink-connector-wikiedits/pom.xml
@@ -40,11 +40,16 @@ under the License.
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
-
<dependency>
<groupId>org.schwering</groupId>
<artifactId>irclib</artifactId>
<version>1.10</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils-junit</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/e7ca1055/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java b/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java
index 439aa36..f6fa8e0 100644
--- a/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java
+++ b/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java
@@ -18,19 +18,29 @@
package org.apache.flink.streaming.connectors.wikiedits;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.testutils.junit.RetryOnFailure;
+import org.apache.flink.testutils.junit.RetryRule;
+import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
@@ -40,71 +50,127 @@ public class WikipediaEditsSourceTest {
private static final Logger LOG = LoggerFactory.getLogger(WikipediaEditsSourceTest.class);
+ @Rule
+ public RetryRule retryRule = new RetryRule();
+
/**
- * NOTE: if you are behind a firewall you may need to use a SOCKS Proxy for this test.
- *
- * <p>We first check the connection to the IRC server. If it fails, this test
- * is effectively ignored.
- *
- * @see <a href="http://docs.oracle.com/javase/8/docs/technotes/guides/net/proxies.html">Socks Proxy</a>
+ * We first check the connection to the IRC server. If it fails, this test is ignored.
*/
- @Test(timeout = 120 * 1000)
+ @Test
+ @RetryOnFailure(times = 1)
public void testWikipediaEditsSource() throws Exception {
- final int numRetries = 5;
- final int waitBetweenRetriesMillis = 2000;
- final int connectTimeout = 1000;
+ if (canConnect(1, TimeUnit.SECONDS)) {
+ final Time testTimeout = Time.seconds(60);
+ final WikipediaEditsSource wikipediaEditsSource = new WikipediaEditsSource();
+
+ ExecutorService executorService = null;
+ try {
+ executorService = Executors.newSingleThreadExecutor();
+ BlockingQueue<Object> collectedEvents = new ArrayBlockingQueue<>(1);
+ AtomicReference<Exception> asyncError = new AtomicReference<>();
+
+ // Execute the source in a different thread and collect events into the queue.
+ // We do this in a separate thread in order to not block the main test thread
+ // indefinitely in case that somethign bad happens (like not receiving any
+ // events)
+ executorService.execute(() -> {
+ try {
+ wikipediaEditsSource.run(new CollectingSourceContext<>(collectedEvents));
+ } catch (Exception e) {
+ boolean interrupted = e instanceof InterruptedException;
+ if (!interrupted) {
+ LOG.warn("Failure in WikipediaEditsSource", e);
+ }
+
+ asyncError.compareAndSet(null, e);
+ }
+ });
+
+ long deadline = deadlineNanos(testTimeout);
- boolean success = false;
+ Object event = null;
+ Exception error = null;
- for (int i = 0; i < numRetries && !success; i++) {
- // Check connection
- boolean canConnect = false;
+ // Check event or error
+ while (event == null && error == null && System.nanoTime() < deadline) {
+ event = collectedEvents.poll(1, TimeUnit.SECONDS);
+ error = asyncError.get();
+ }
+
+ if (error != null) {
+ // We don't use assertNull, because we want to include the error message
+ fail("Failure in WikipediaEditsSource: " + error.getMessage());
+ }
- String host = WikipediaEditsSource.DEFAULT_HOST;
- int port = WikipediaEditsSource.DEFAULT_PORT;
+ assertNotNull("Did not receive a WikipediaEditEvent within the desired timeout", event);
+ assertTrue("Received unexpected event " + event, event instanceof WikipediaEditEvent);
+ } finally {
+ wikipediaEditsSource.cancel();
- try (Socket s = new Socket()) {
- s.connect(new InetSocketAddress(host, port), connectTimeout);
- canConnect = s.isConnected();
- } catch (Throwable ignored) {
+ if (executorService != null) {
+ executorService.shutdownNow();
+ executorService.awaitTermination(1, TimeUnit.SECONDS);
+ }
}
+ } else {
+ LOG.info("Skipping test, because not able to connect to IRC server.");
+ }
+ }
- if (canConnect) {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().disableSysoutLogging();
+ private long deadlineNanos(Time testTimeout) {
+ return System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(testTimeout.toMilliseconds());
+ }
- DataStream<WikipediaEditEvent> edits = env.addSource(new WikipediaEditsSource());
+ private static class CollectingSourceContext<T> implements SourceFunction.SourceContext<T> {
- edits.addSink(new SinkFunction<WikipediaEditEvent>() {
- @Override
- public void invoke(WikipediaEditEvent value) throws Exception {
- throw new Exception("Expected test exception");
- }
- });
+ private final BlockingQueue<Object> events;
- try {
- env.execute();
- fail("Did not throw expected Exception.");
- } catch (Exception e) {
- assertNotNull(e.getCause());
- assertEquals("Expected test exception", e.getCause().getMessage());
- }
+ private CollectingSourceContext(BlockingQueue<Object> events) {
+ this.events = Objects.requireNonNull(events, "events");
+ }
- success = true;
- } else {
- LOG.info("Failed to connect to IRC server ({}/{}). Retrying in {} ms.",
- i + 1,
- numRetries,
- waitBetweenRetriesMillis);
+ @Override
+ public void collect(T element) {
+ events.offer(element);
+ }
+
+ @Override
+ public void collectWithTimestamp(T element, long timestamp) {
+ throw new UnsupportedOperationException();
- Thread.sleep(waitBetweenRetriesMillis);
- }
}
- if (success) {
- LOG.info("Successfully ran test.");
- } else {
- LOG.info("Skipped test, because not able to connect to IRC server.");
+ @Override
+ public void emitWatermark(Watermark mark) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void markAsTemporarilyIdle() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ throw new UnsupportedOperationException();
}
+
+ @Override
+ public void close() {
+ }
+
+ }
+
+ private static boolean canConnect(int timeout, TimeUnit unit) {
+ String host = WikipediaEditsSource.DEFAULT_HOST;
+ int port = WikipediaEditsSource.DEFAULT_PORT;
+
+ try (Socket s = new Socket()) {
+ s.connect(new InetSocketAddress(host, port), (int) unit.toMillis(timeout));
+ return s.isConnected();
+ } catch (Throwable ignored) {
+ }
+
+ return false;
}
}