You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/23 15:25:23 UTC
[1/2] flink git commit: [FLINK-2744] [kafka connector] Reduce number
of concurrent test forks to improce test stability on CI environment.
Repository: flink
Updated Branches:
refs/heads/master 1f17ff540 -> ea3b5efdd
[FLINK-2744] [kafka connector] Reduce number of concurrent test forks to improce test stability on CI environment.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44fe370d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44fe370d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44fe370d
Branch: refs/heads/master
Commit: 44fe370d2233f37664117846e18bb7ebf4c3c9fe
Parents: 1f17ff5
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 23 12:18:16 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Sep 23 14:04:37 2015 +0200
----------------------------------------------------------------------
.../flink-connector-kafka/pom.xml | 13 +++++++++++++
1 file changed, 13 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/44fe370d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml
index 528fc88..f098d9c 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml
@@ -114,4 +114,17 @@ under the License.
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <configuration>
+ <!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
+ <forkCount>1</forkCount>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
[2/2] flink git commit: [tests] Improve Kafka concurrent
produce/consumer test by allowing retries when connecting to non-leader
broker.
Posted by se...@apache.org.
[tests] Improve Kafka concurrent produce/consumer test by allowing retries when connecting to non-leader broker.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea3b5efd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea3b5efd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea3b5efd
Branch: refs/heads/master
Commit: ea3b5efdddd6c5cf769dddcbcfef16b75574a59f
Parents: 44fe370
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 23 13:31:13 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Sep 23 14:18:11 2015 +0200
----------------------------------------------------------------------
.../connectors/kafka/KafkaConsumerTestBase.java | 36 +++++++++++++++++---
.../connectors/kafka/KafkaTestBase.java | 28 +++++++++++++++
2 files changed, 59 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ea3b5efd/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index ed1644c..5016e7e 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -40,6 +40,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
@@ -64,11 +65,14 @@ import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOn
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.testutils.junit.RetryOnException;
+import org.apache.flink.testutils.junit.RetryRule;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.Assert;
+import org.junit.Rule;
import scala.collection.Seq;
import java.lang.reflect.Field;
@@ -93,8 +97,10 @@ import static org.junit.Assert.fail;
@SuppressWarnings("serial")
public abstract class KafkaConsumerTestBase extends KafkaTestBase {
-
-
+
+ @Rule
+ public RetryRule retryRule = new RetryRule();
+
// ------------------------------------------------------------------------
// Required methods by the abstract test base
// ------------------------------------------------------------------------
@@ -242,11 +248,16 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
* <pre>
* (generator source) --> (kafka sink)-[KAFKA-TOPIC]-(kafka source) --> (validating sink)
* </pre>
+ *
+ * We need to externally retry this test. We cannot let Flink's retry mechanism do it, because the Kafka producer
+ * does not guarantee exactly-once output. Hence a recovery would introduce duplicates that
+ * cause the test to fail.
*/
+ @RetryOnException(times=2, exception=kafka.common.NotLeaderForPartitionException.class)
public void runSimpleConcurrentProducerConsumerTopology() throws Exception {
LOG.info("Starting runSimpleConcurrentProducerConsumerTopology()");
- final String topic = "concurrentProducerConsumerTopic";
+ final String topic = "concurrentProducerConsumerTopic_" + UUID.randomUUID().toString();
final int parallelism = 3;
final int elementsPerPartition = 100;
final int totalElements = parallelism * elementsPerPartition;
@@ -256,7 +267,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
env.setParallelism(parallelism);
- env.setNumberOfExecutionRetries(0);
env.getConfig().disableSysoutLogging();
TypeInformation<Tuple2<Long, String>> longStringType = TypeInfoParser.parse("Tuple2<Long, String>");
@@ -331,7 +341,23 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
}).setParallelism(1);
- tryExecute(env, "runSimpleConcurrentProducerConsumerTopology");
+ try {
+ tryExecutePropagateExceptions(env, "runSimpleConcurrentProducerConsumerTopology");
+ }
+ catch (ProgramInvocationException | JobExecutionException e) {
+ // look for NotLeaderForPartitionException
+ Throwable cause = e.getCause();
+
+ // search for nested SuccessExceptions
+ int depth = 0;
+ while (cause != null && depth++ < 20) {
+ if (cause instanceof kafka.common.NotLeaderForPartitionException) {
+ throw (Exception) cause;
+ }
+ cause = cause.getCause();
+ }
+ throw e;
+ }
LOG.info("Finished runSimpleConcurrentProducerConsumerTopology()");
http://git-wip-us.apache.org/repos/asf/flink/blob/ea3b5efd/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index cfcee35..cfc104b 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -316,6 +316,28 @@ public abstract class KafkaTestBase extends TestLogger {
}
}
+ protected static void tryExecutePropagateExceptions(StreamExecutionEnvironment see, String name) throws Exception {
+ try {
+ see.execute(name);
+ }
+ catch (ProgramInvocationException | JobExecutionException root) {
+ Throwable cause = root.getCause();
+
+ // search for nested SuccessExceptions
+ int depth = 0;
+ while (!(cause instanceof SuccessException)) {
+ if (cause == null || depth++ == 20) {
+ throw root;
+ }
+ else {
+ cause = cause.getCause();
+ }
+ }
+ }
+ }
+
+
+
protected static void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
// create topic with one client
@@ -331,6 +353,12 @@ public abstract class KafkaTestBase extends TestLogger {
// validate that the topic has been created
final long deadline = System.currentTimeMillis() + 30000;
do {
+ try {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e) {
+ // restore interrupted state
+ }
List<PartitionInfo> partitions = FlinkKafkaConsumer.getPartitionsForTopic(topic, standardProps);
if (partitions != null && partitions.size() > 0) {
return;