You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/12/04 13:29:27 UTC

[1/2] flink git commit: [FLINK-8167] [connector-wikiedits] Harden WikipediaEditsSource

Repository: flink
Updated Branches:
  refs/heads/master 80348d653 -> e7ca1055a


[FLINK-8167] [connector-wikiedits] Harden WikipediaEditsSource

- Minor eager sanity checks
- Use UUID suffix for nickname. As reported in FLINK-8167, the current
  nickname suffix can result in nickname clashes which lead to test
  failures.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ec0c416a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ec0c416a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ec0c416a

Branch: refs/heads/master
Commit: ec0c416a69398c9ef3bfd2be51d0dc6677e19f7b
Parents: 80348d6
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Nov 29 16:28:18 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Dec 4 14:28:51 2017 +0100

----------------------------------------------------------------------
 .../wikiedits/WikipediaEditEventIrcStream.java  | 44 +++++++++----------
 .../wikiedits/WikipediaEditsSource.java         | 46 ++++++++------------
 2 files changed, 41 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ec0c416a/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditEventIrcStream.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditEventIrcStream.java b/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditEventIrcStream.java
index 81dfa7b..84d89dd 100644
--- a/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditEventIrcStream.java
+++ b/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditEventIrcStream.java
@@ -26,10 +26,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Objects;
+import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 
-class WikipediaEditEventIrcStream {
+class WikipediaEditEventIrcStream implements AutoCloseable {
 
 	private static final Logger LOG = LoggerFactory.getLogger(WikipediaEditEventIrcStream.class);
 
@@ -37,11 +39,11 @@ class WikipediaEditEventIrcStream {
 	private final BlockingQueue<WikipediaEditEvent> edits =
 			new ArrayBlockingQueue<>(128);
 
-	/** IRC connection (Thread). */
+	/** IRC connection (NOTE: this is a separate Thread). */
 	private IRCConnection conn;
 
 	WikipediaEditEventIrcStream(String host, int port) {
-		final String nick = "flink-bot-" + (int) (Math.random() * 1000);
+		final String nick = "flink-bot-" + UUID.randomUUID().toString();
 		this.conn = new IRCConnection(host, new int[] { port}, "", nick, nick, nick);
 		conn.addIRCEventListener(new WikipediaIrcChannelListener(edits));
 		conn.setEncoding("UTF-8");
@@ -51,25 +53,18 @@ class WikipediaEditEventIrcStream {
 		conn.setName("WikipediaEditEventIrcStreamThread");
 	}
 
-	void start() throws IOException {
-		if (!conn.isConnected()) {
-			conn.connect();
-		}
+	BlockingQueue<WikipediaEditEvent> getEdits() {
+		return edits;
 	}
 
-	void stop() throws InterruptedException {
-		if (conn.isConnected()) {
+	void connect() throws IOException {
+		if (!conn.isConnected()) {
+			conn.connect();
 		}
-
-		conn.interrupt();
-		conn.join(5 * 1000);
-	}
-
-	BlockingQueue<WikipediaEditEvent> getEdits() {
-		return edits;
 	}
 
 	void join(String channel) {
+		Objects.requireNonNull(channel, "channel");
 		conn.send("JOIN " + channel);
 	}
 
@@ -77,6 +72,15 @@ class WikipediaEditEventIrcStream {
 		conn.send("PART " + channel);
 	}
 
+	@Override
+	public void close() throws Exception {
+		if (conn != null && conn.isConnected()) {
+			conn.doQuit();
+			conn.close();
+			conn.join(5 * 1000);
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	// IRC channel listener
 	// ------------------------------------------------------------------------
@@ -85,12 +89,8 @@ class WikipediaEditEventIrcStream {
 
 		private final BlockingQueue<WikipediaEditEvent> edits;
 
-		public WikipediaIrcChannelListener(BlockingQueue<WikipediaEditEvent> edits) {
-			if (edits == null) {
-				throw new NullPointerException();
-			}
-
-			this.edits = edits;
+		WikipediaIrcChannelListener(BlockingQueue<WikipediaEditEvent> edits) {
+			this.edits = Objects.requireNonNull(edits, "edits");
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ec0c416a/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSource.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSource.java b/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSource.java
index a10abdb..4498beb 100644
--- a/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSource.java
+++ b/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSource.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.streaming.connectors.wikiedits;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -44,8 +44,6 @@ public class WikipediaEditsSource extends RichSourceFunction<WikipediaEditEvent>
 
 	private volatile boolean isRunning = true;
 
-	private WikipediaEditEventIrcStream ircStream;
-
 	/**
 	 * Creates a source reading {@link WikipediaEditEvent} instances from the
 	 * IRC channel <code>#en.wikipedia</code>.
@@ -72,33 +70,27 @@ public class WikipediaEditsSource extends RichSourceFunction<WikipediaEditEvent>
 	public WikipediaEditsSource(String host, int port, String channel) {
 		this.host = host;
 		this.port = port;
-		this.channel = channel;
+		this.channel = Objects.requireNonNull(channel);
 	}
 
 	@Override
-	public void open(Configuration parameters) throws Exception {
-		ircStream = new WikipediaEditEventIrcStream(host, port);
-		ircStream.start();
-		ircStream.join(channel);
-	}
-
-	@Override
-	public void close() throws Exception {
-		if (ircStream != null) {
-			ircStream.leave(channel);
-			ircStream.stop();
-		}
-	}
-
-	@Override
-	public void run(SourceContext ctx) throws Exception {
-		while (isRunning) {
-			// Query for the next edit event
-			WikipediaEditEvent edit = ircStream.getEdits()
-					.poll(100, TimeUnit.MILLISECONDS);
-
-			if (edit != null) {
-				ctx.collect(edit);
+	public void run(SourceContext<WikipediaEditEvent> ctx) throws Exception {
+		try (WikipediaEditEventIrcStream ircStream = new WikipediaEditEventIrcStream(host, port)) {
+			// Open connection and join channel
+			ircStream.connect();
+			ircStream.join(channel);
+
+			try {
+				while (isRunning) {
+					// Query for the next edit event
+					WikipediaEditEvent edit = ircStream.getEdits().poll(100, TimeUnit.MILLISECONDS);
+
+					if (edit != null) {
+						ctx.collect(edit);
+					}
+				}
+			} finally {
+				ircStream.leave(channel);
 			}
 		}
 	}


[2/2] flink git commit: [FLINK-7762] [connector-wikiedits] Make WikipediaEditsSourceTest proper test

Posted by ch...@apache.org.
[FLINK-7762] [connector-wikiedits] Make WikipediaEditsSourceTest proper test

The WikipediaEditsSourceTest unnecessarily implements an integration
test that starts a FlinkMiniCluster and executes a small Flink program.

This simply creates a source and executes run in a separate thread until
a single WikipediaEditEvent is received.

This closes #5102.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7ca1055
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7ca1055
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7ca1055

Branch: refs/heads/master
Commit: e7ca1055afc46e4a6ccb3492758de36bfd6419a3
Parents: ec0c416
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Nov 29 16:36:29 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Dec 4 14:29:01 2017 +0100

----------------------------------------------------------------------
 flink-contrib/flink-connector-wikiedits/pom.xml |   7 +-
 .../wikiedits/WikipediaEditsSourceTest.java     | 172 +++++++++++++------
 2 files changed, 125 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e7ca1055/flink-contrib/flink-connector-wikiedits/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-connector-wikiedits/pom.xml b/flink-contrib/flink-connector-wikiedits/pom.xml
index c13f93e..3908658 100644
--- a/flink-contrib/flink-connector-wikiedits/pom.xml
+++ b/flink-contrib/flink-connector-wikiedits/pom.xml
@@ -40,11 +40,16 @@ under the License.
 			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
-
 		<dependency>
 			<groupId>org.schwering</groupId>
 			<artifactId>irclib</artifactId>
 			<version>1.10</version>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils-junit</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/e7ca1055/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java b/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java
index 439aa36..f6fa8e0 100644
--- a/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java
+++ b/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java
@@ -18,19 +18,29 @@
 
 package org.apache.flink.streaming.connectors.wikiedits;
 
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.testutils.junit.RetryOnFailure;
+import org.apache.flink.testutils.junit.RetryRule;
 
+import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -40,71 +50,127 @@ public class WikipediaEditsSourceTest {
 
 	private static final Logger LOG = LoggerFactory.getLogger(WikipediaEditsSourceTest.class);
 
+	@Rule
+	public RetryRule retryRule = new RetryRule();
+
 	/**
-	 * NOTE: if you are behind a firewall you may need to use a SOCKS Proxy for this test.
-	 *
-	 * <p>We first check the connection to the IRC server. If it fails, this test
-	 * is effectively ignored.
-	 *
-	 * @see <a href="http://docs.oracle.com/javase/8/docs/technotes/guides/net/proxies.html">Socks Proxy</a>
+	 * We first check the connection to the IRC server. If it fails, this test is ignored.
 	 */
-	@Test(timeout = 120 * 1000)
+	@Test
+	@RetryOnFailure(times = 1)
 	public void testWikipediaEditsSource() throws Exception {
-		final int numRetries = 5;
-		final int waitBetweenRetriesMillis = 2000;
-		final int connectTimeout = 1000;
+		if (canConnect(1, TimeUnit.SECONDS)) {
+			final Time testTimeout = Time.seconds(60);
+			final WikipediaEditsSource wikipediaEditsSource = new WikipediaEditsSource();
+
+			ExecutorService executorService = null;
+			try {
+				executorService = Executors.newSingleThreadExecutor();
+				BlockingQueue<Object> collectedEvents = new ArrayBlockingQueue<>(1);
+				AtomicReference<Exception> asyncError = new AtomicReference<>();
+
+				// Execute the source in a different thread and collect events into the queue.
+				// We do this in a separate thread in order to not block the main test thread
+				// indefinitely in case that somethign bad happens (like not receiving any
+				// events)
+				executorService.execute(() -> {
+					try {
+						wikipediaEditsSource.run(new CollectingSourceContext<>(collectedEvents));
+					} catch (Exception e) {
+						boolean interrupted = e instanceof InterruptedException;
+						if (!interrupted) {
+							LOG.warn("Failure in WikipediaEditsSource", e);
+						}
+
+						asyncError.compareAndSet(null, e);
+					}
+				});
+
+				long deadline = deadlineNanos(testTimeout);
 
-		boolean success = false;
+				Object event = null;
+				Exception error = null;
 
-		for (int i = 0; i < numRetries && !success; i++) {
-			// Check connection
-			boolean canConnect = false;
+				// Check event or error
+				while (event == null && error == null && System.nanoTime() < deadline) {
+					event = collectedEvents.poll(1, TimeUnit.SECONDS);
+					error = asyncError.get();
+				}
+
+				if (error != null) {
+					// We don't use assertNull, because we want to include the error message
+					fail("Failure in WikipediaEditsSource: " + error.getMessage());
+				}
 
-			String host = WikipediaEditsSource.DEFAULT_HOST;
-			int port = WikipediaEditsSource.DEFAULT_PORT;
+				assertNotNull("Did not receive a WikipediaEditEvent within the desired timeout", event);
+				assertTrue("Received unexpected event " + event, event instanceof WikipediaEditEvent);
+			} finally {
+				wikipediaEditsSource.cancel();
 
-			try (Socket s = new Socket()) {
-				s.connect(new InetSocketAddress(host, port), connectTimeout);
-				canConnect = s.isConnected();
-			} catch (Throwable ignored) {
+				if (executorService != null) {
+					executorService.shutdownNow();
+					executorService.awaitTermination(1, TimeUnit.SECONDS);
+				}
 			}
+		} else {
+			LOG.info("Skipping test, because not able to connect to IRC server.");
+		}
+	}
 
-			if (canConnect) {
-				StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-				env.getConfig().disableSysoutLogging();
+	private long deadlineNanos(Time testTimeout) {
+		return System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(testTimeout.toMilliseconds());
+	}
 
-				DataStream<WikipediaEditEvent> edits = env.addSource(new WikipediaEditsSource());
+	private static class CollectingSourceContext<T> implements SourceFunction.SourceContext<T> {
 
-				edits.addSink(new SinkFunction<WikipediaEditEvent>() {
-					@Override
-					public void invoke(WikipediaEditEvent value) throws Exception {
-						throw new Exception("Expected test exception");
-					}
-				});
+		private final BlockingQueue<Object> events;
 
-				try {
-					env.execute();
-					fail("Did not throw expected Exception.");
-				} catch (Exception e) {
-					assertNotNull(e.getCause());
-					assertEquals("Expected test exception", e.getCause().getMessage());
-				}
+		private CollectingSourceContext(BlockingQueue<Object> events) {
+			this.events = Objects.requireNonNull(events, "events");
+		}
 
-				success = true;
-			} else {
-				LOG.info("Failed to connect to IRC server ({}/{}). Retrying in {} ms.",
-						i + 1,
-						numRetries,
-						waitBetweenRetriesMillis);
+		@Override
+		public void collect(T element) {
+			events.offer(element);
+		}
+
+		@Override
+		public void collectWithTimestamp(T element, long timestamp) {
+			throw new UnsupportedOperationException();
 
-				Thread.sleep(waitBetweenRetriesMillis);
-			}
 		}
 
-		if (success) {
-			LOG.info("Successfully ran test.");
-		} else {
-			LOG.info("Skipped test, because not able to connect to IRC server.");
+		@Override
+		public void emitWatermark(Watermark mark) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void markAsTemporarilyIdle() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			throw new UnsupportedOperationException();
 		}
+
+		@Override
+		public void close() {
+		}
+
+	}
+
+	private static boolean canConnect(int timeout, TimeUnit unit) {
+		String host = WikipediaEditsSource.DEFAULT_HOST;
+		int port = WikipediaEditsSource.DEFAULT_PORT;
+
+		try (Socket s = new Socket()) {
+			s.connect(new InetSocketAddress(host, port), (int) unit.toMillis(timeout));
+			return s.isConnected();
+		} catch (Throwable ignored) {
+		}
+
+		return false;
 	}
 }