You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/12/04 13:29:28 UTC

[2/2] flink git commit: [FLINK-7762] [connector-wikiedits] Make WikipediaEditsSourceTest proper test

[FLINK-7762] [connector-wikiedits] Make WikipediaEditsSourceTest proper test

The WikipediaEditsSourceTest unnecessarily implements an integration
test that starts a FlinkMiniCluster and executes a small Flink program.

This simply creates a source and executes run in a separate thread until
a single WikipediaEditEvent is received.

This closes #5102.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7ca1055
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7ca1055
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7ca1055

Branch: refs/heads/master
Commit: e7ca1055afc46e4a6ccb3492758de36bfd6419a3
Parents: ec0c416
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Nov 29 16:36:29 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Dec 4 14:29:01 2017 +0100

----------------------------------------------------------------------
 flink-contrib/flink-connector-wikiedits/pom.xml |   7 +-
 .../wikiedits/WikipediaEditsSourceTest.java     | 172 +++++++++++++------
 2 files changed, 125 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e7ca1055/flink-contrib/flink-connector-wikiedits/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-connector-wikiedits/pom.xml b/flink-contrib/flink-connector-wikiedits/pom.xml
index c13f93e..3908658 100644
--- a/flink-contrib/flink-connector-wikiedits/pom.xml
+++ b/flink-contrib/flink-connector-wikiedits/pom.xml
@@ -40,11 +40,16 @@ under the License.
 			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 		</dependency>
-
 		<dependency>
 			<groupId>org.schwering</groupId>
 			<artifactId>irclib</artifactId>
 			<version>1.10</version>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils-junit</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/e7ca1055/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java b/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java
index 439aa36..f6fa8e0 100644
--- a/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java
+++ b/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java
@@ -18,19 +18,29 @@
 
 package org.apache.flink.streaming.connectors.wikiedits;
 
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.testutils.junit.RetryOnFailure;
+import org.apache.flink.testutils.junit.RetryRule;
 
+import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -40,71 +50,127 @@ public class WikipediaEditsSourceTest {
 
 	private static final Logger LOG = LoggerFactory.getLogger(WikipediaEditsSourceTest.class);
 
+	@Rule
+	public RetryRule retryRule = new RetryRule();
+
 	/**
-	 * NOTE: if you are behind a firewall you may need to use a SOCKS Proxy for this test.
-	 *
-	 * <p>We first check the connection to the IRC server. If it fails, this test
-	 * is effectively ignored.
-	 *
-	 * @see <a href="http://docs.oracle.com/javase/8/docs/technotes/guides/net/proxies.html">Socks Proxy</a>
+	 * We first check the connection to the IRC server. If it fails, this test is ignored.
 	 */
-	@Test(timeout = 120 * 1000)
+	@Test
+	@RetryOnFailure(times = 1)
 	public void testWikipediaEditsSource() throws Exception {
-		final int numRetries = 5;
-		final int waitBetweenRetriesMillis = 2000;
-		final int connectTimeout = 1000;
+		if (canConnect(1, TimeUnit.SECONDS)) {
+			final Time testTimeout = Time.seconds(60);
+			final WikipediaEditsSource wikipediaEditsSource = new WikipediaEditsSource();
+
+			ExecutorService executorService = null;
+			try {
+				executorService = Executors.newSingleThreadExecutor();
+				BlockingQueue<Object> collectedEvents = new ArrayBlockingQueue<>(1);
+				AtomicReference<Exception> asyncError = new AtomicReference<>();
+
+				// Execute the source in a different thread and collect events into the queue.
+				// We do this in a separate thread in order to not block the main test thread
+				// indefinitely in case that somethign bad happens (like not receiving any
+				// events)
+				executorService.execute(() -> {
+					try {
+						wikipediaEditsSource.run(new CollectingSourceContext<>(collectedEvents));
+					} catch (Exception e) {
+						boolean interrupted = e instanceof InterruptedException;
+						if (!interrupted) {
+							LOG.warn("Failure in WikipediaEditsSource", e);
+						}
+
+						asyncError.compareAndSet(null, e);
+					}
+				});
+
+				long deadline = deadlineNanos(testTimeout);
 
-		boolean success = false;
+				Object event = null;
+				Exception error = null;
 
-		for (int i = 0; i < numRetries && !success; i++) {
-			// Check connection
-			boolean canConnect = false;
+				// Check event or error
+				while (event == null && error == null && System.nanoTime() < deadline) {
+					event = collectedEvents.poll(1, TimeUnit.SECONDS);
+					error = asyncError.get();
+				}
+
+				if (error != null) {
+					// We don't use assertNull, because we want to include the error message
+					fail("Failure in WikipediaEditsSource: " + error.getMessage());
+				}
 
-			String host = WikipediaEditsSource.DEFAULT_HOST;
-			int port = WikipediaEditsSource.DEFAULT_PORT;
+				assertNotNull("Did not receive a WikipediaEditEvent within the desired timeout", event);
+				assertTrue("Received unexpected event " + event, event instanceof WikipediaEditEvent);
+			} finally {
+				wikipediaEditsSource.cancel();
 
-			try (Socket s = new Socket()) {
-				s.connect(new InetSocketAddress(host, port), connectTimeout);
-				canConnect = s.isConnected();
-			} catch (Throwable ignored) {
+				if (executorService != null) {
+					executorService.shutdownNow();
+					executorService.awaitTermination(1, TimeUnit.SECONDS);
+				}
 			}
+		} else {
+			LOG.info("Skipping test, because not able to connect to IRC server.");
+		}
+	}
 
-			if (canConnect) {
-				StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-				env.getConfig().disableSysoutLogging();
+	private long deadlineNanos(Time testTimeout) {
+		return System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(testTimeout.toMilliseconds());
+	}
 
-				DataStream<WikipediaEditEvent> edits = env.addSource(new WikipediaEditsSource());
+	private static class CollectingSourceContext<T> implements SourceFunction.SourceContext<T> {
 
-				edits.addSink(new SinkFunction<WikipediaEditEvent>() {
-					@Override
-					public void invoke(WikipediaEditEvent value) throws Exception {
-						throw new Exception("Expected test exception");
-					}
-				});
+		private final BlockingQueue<Object> events;
 
-				try {
-					env.execute();
-					fail("Did not throw expected Exception.");
-				} catch (Exception e) {
-					assertNotNull(e.getCause());
-					assertEquals("Expected test exception", e.getCause().getMessage());
-				}
+		private CollectingSourceContext(BlockingQueue<Object> events) {
+			this.events = Objects.requireNonNull(events, "events");
+		}
 
-				success = true;
-			} else {
-				LOG.info("Failed to connect to IRC server ({}/{}). Retrying in {} ms.",
-						i + 1,
-						numRetries,
-						waitBetweenRetriesMillis);
+		@Override
+		public void collect(T element) {
+			events.offer(element);
+		}
+
+		@Override
+		public void collectWithTimestamp(T element, long timestamp) {
+			throw new UnsupportedOperationException();
 
-				Thread.sleep(waitBetweenRetriesMillis);
-			}
 		}
 
-		if (success) {
-			LOG.info("Successfully ran test.");
-		} else {
-			LOG.info("Skipped test, because not able to connect to IRC server.");
+		@Override
+		public void emitWatermark(Watermark mark) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void markAsTemporarilyIdle() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			throw new UnsupportedOperationException();
 		}
+
+		@Override
+		public void close() {
+		}
+
+	}
+
+	private static boolean canConnect(int timeout, TimeUnit unit) {
+		String host = WikipediaEditsSource.DEFAULT_HOST;
+		int port = WikipediaEditsSource.DEFAULT_PORT;
+
+		try (Socket s = new Socket()) {
+			s.connect(new InetSocketAddress(host, port), (int) unit.toMillis(timeout));
+			return s.isConnected();
+		} catch (Throwable ignored) {
+		}
+
+		return false;
 	}
 }