You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/10/13 08:24:15 UTC

[1/2] flink git commit: [hotfix][kafka] Undo DataGenerators changes (use inline kafka producer again

Repository: flink
Updated Branches:
  refs/heads/master 15df71ba9 -> 3d5bca0ab


[hotfix][kafka] Undo DataGenerators changes (use inline kafka producer again


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/744f8ebb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/744f8ebb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/744f8ebb

Branch: refs/heads/master
Commit: 744f8ebb66b2a7288942be139cd7a7e6d1170c80
Parents: 15df71b
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Oct 11 15:48:32 2016 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Oct 12 14:03:14 2016 +0200

----------------------------------------------------------------------
 .../kafka/KafkaTestEnvironmentImpl.java         |   3 -
 .../connectors/kafka/Kafka09ITCase.java         |   9 -
 .../connectors/kafka/KafkaConsumerTestBase.java | 242 +------------------
 .../kafka/testutils/DataGenerators.java         | 165 ++++++-------
 4 files changed, 72 insertions(+), 347 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/744f8ebb/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index af6d254..78fc1c6 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -115,9 +115,6 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
 		prod.setFlushOnCheckpoint(true);
 		return stream.addSink(prod);
-	/*	FlinkKafkaProducer010.FlinkKafkaProducer010Configuration<T> sink = FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, topic, serSchema, props, partitioner);
-		sink.setFlushOnCheckpoint(true);
-		return sink; */
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/744f8ebb/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index fd167a0..b9ec18a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -35,15 +35,6 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
 		runSimpleConcurrentProducerConsumerTopology();
 	}
 
-//	@Test(timeout = 60000)
-//	public void testPunctuatedExplicitWMConsumer() throws Exception {
-//		runExplicitPunctuatedWMgeneratingConsumerTest(false);
-//	}
-
-//	@Test(timeout = 60000)
-//	public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception {
-//		runExplicitPunctuatedWMgeneratingConsumerTest(true);
-//	}
 
 	@Test(timeout = 60000)
 	public void testKeyValueSupport() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/744f8ebb/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 3c967ba..0810a3e 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import kafka.consumer.Consumer;
 import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
@@ -31,7 +29,6 @@ import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -39,13 +36,10 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.table.StreamTableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
@@ -68,7 +62,6 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -92,7 +85,6 @@ import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.testutils.junit.RetryOnException;
 import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.StringUtils;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.junit.Assert;
@@ -116,7 +108,6 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.test.util.TestUtils.tryExecute;
@@ -517,7 +508,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		// launch a producer thread
 		DataGenerators.InfiniteStringsGenerator generator =
-				new DataGenerators.InfiniteStringsGenerator(kafkaServer, topic, flinkPort);
+				new DataGenerators.InfiniteStringsGenerator(kafkaServer, topic);
 		generator.start();
 
 		// launch a consumer asynchronously
@@ -571,7 +562,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		assertTrue(failueCause.getMessage().contains("Job was cancelled"));
 
 		if (generator.isAlive()) {
-			JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout), "String generator");
 			generator.shutdown();
 			generator.join();
 		}
@@ -1723,234 +1713,4 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			this.numElementsTotal = state;
 		}
 	}
-
-	/////////////			Testing the Kafka consumer with embeded watermark generation functionality			///////////////
-
-//	@RetryOnException(times=0, exception=kafka.common.NotLeaderForPartitionException.class)
-//	public void runExplicitPunctuatedWMgeneratingConsumerTest(boolean emptyPartition) throws Exception {
-//
-//		final String topic1 = "wmExtractorTopic1_" + UUID.randomUUID().toString();
-//		final String topic2 = "wmExtractorTopic2_" + UUID.randomUUID().toString();
-//
-//		final Map<String, Boolean> topics = new HashMap<>();
-//		topics.put(topic1, false);
-//		topics.put(topic2, emptyPartition);
-//
-//		final int noOfTopcis = topics.size();
-//		final int partitionsPerTopic = 1;
-//		final int elementsPerPartition = 100 + 1;
-//
-//		final int totalElements = emptyPartition ?
-//			partitionsPerTopic * elementsPerPartition :
-//			noOfTopcis * partitionsPerTopic * elementsPerPartition;
-//
-//		createTestTopic(topic1, partitionsPerTopic, 1);
-//		createTestTopic(topic2, partitionsPerTopic, 1);
-//
-//		final StreamExecutionEnvironment env =
-//			StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-//		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-//		env.setParallelism(partitionsPerTopic);
-//		env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
-//		env.getConfig().disableSysoutLogging();
-//
-//		TypeInformation<Tuple2<Long, Integer>> longIntType = TypeInfoParser.parse("Tuple2<Long, Integer>");
-//
-//		Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
-//		producerProperties.setProperty("retries", "0");
-//
-//		putDataInTopics(env, producerProperties, elementsPerPartition, topics, longIntType);
-//
-//		List<String> topicTitles = new ArrayList<>(topics.keySet());
-//		runPunctuatedComsumer(env, topicTitles, totalElements, longIntType);
-//
-//		executeAndCatchException(env, "runComsumerWithPunctuatedExplicitWMTest");
-//
-//		for(String topic: topicTitles) {
-//			deleteTestTopic(topic);
-//		}
-//	}
-//
-//	private void executeAndCatchException(StreamExecutionEnvironment env, String execName) throws Exception {
-//		try {
-//			tryExecutePropagateExceptions(env, execName);
-//		}
-//		catch (ProgramInvocationException | JobExecutionException e) {
-//			// look for NotLeaderForPartitionException
-//			Throwable cause = e.getCause();
-//
-//			// search for nested SuccessExceptions
-//			int depth = 0;
-//			while (cause != null && depth++ < 20) {
-//				if (cause instanceof kafka.common.NotLeaderForPartitionException) {
-//					throw (Exception) cause;
-//				}
-//				cause = cause.getCause();
-//			}
-//			throw e;
-//		}
-//	}
-//
-//	private void putDataInTopics(StreamExecutionEnvironment env,
-//								Properties producerProperties,
-//								final int elementsPerPartition,
-//								Map<String, Boolean> topics,
-//								TypeInformation<Tuple2<Long, Integer>> outputTypeInfo) {
-//		if(topics.size() != 2) {
-//			throw new RuntimeException("This method accepts two topics as arguments.");
-//		}
-//
-//		TypeInformationSerializationSchema<Tuple2<Long, Integer>> sinkSchema =
-//			new TypeInformationSerializationSchema<>(outputTypeInfo, env.getConfig());
-//
-//		DataStream<Tuple2<Long, Integer>> stream = env
-//			.addSource(new RichParallelSourceFunction<Tuple2<Long, Integer>>() {
-//				private boolean running = true;
-//
-//				@Override
-//				public void run(SourceContext<Tuple2<Long, Integer>> ctx) throws InterruptedException {
-//					int topic = 0;
-//					int currentTs = 1;
-//
-//					while (running && currentTs < elementsPerPartition) {
-//						long timestamp = (currentTs % 10 == 0) ? -1L : currentTs;
-//						ctx.collect(new Tuple2<Long, Integer>(timestamp, topic));
-//						currentTs++;
-//					}
-//
-//					Tuple2<Long, Integer> toWrite2 = new Tuple2<Long, Integer>(-1L, topic);
-//					ctx.collect(toWrite2);
-//				}
-//
-//				@Override
-//				public void cancel() {
-//				running = false;
-//			}
-//			}).setParallelism(1);
-//
-//		List<Map.Entry<String, Boolean>> topicsL = new ArrayList<>(topics.entrySet());
-//
-//		stream = stream.map(new MapFunction<Tuple2<Long,Integer>, Tuple2<Long,Integer>>() {
-//
-//			@Override
-//			public Tuple2<Long, Integer> map(Tuple2<Long, Integer> value) throws Exception {
-//				return value;
-//			}
-//		}).setParallelism(1);
-//		kafkaServer.produceIntoKafka(stream, topicsL.get(0).getKey(),
-//			new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null).setParallelism(1);
-//
-//		if(!topicsL.get(1).getValue()) {
-//			stream.map(new MapFunction<Tuple2<Long,Integer>, Tuple2<Long,Integer>>() {
-//
-//				@Override
-//				public Tuple2<Long, Integer> map(Tuple2<Long, Integer> value) throws Exception {
-//					long timestamp = (value.f0 == -1) ? -1L : 1000 + value.f0;
-//					return new Tuple2<>(timestamp, 1);
-//				}
-//			}).setParallelism(1).addSink(kafkaServer.produceIntoKafka(topicsL.get(1).getKey(),
-//				new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null)).setParallelism(1);
-//		}
-//	}
-
-	private DataStreamSink<Tuple2<Long, Integer>> runPunctuatedComsumer(StreamExecutionEnvironment env,
-																		List<String> topics,
-																		final int totalElementsToExpect,
-																		TypeInformation<Tuple2<Long, Integer>> inputTypeInfo) {
-
-		TypeInformationSerializationSchema<Tuple2<Long, Integer>> sourceSchema =
-			new TypeInformationSerializationSchema<>(inputTypeInfo, env.getConfig());
-
-		Properties props = new Properties();
-		props.putAll(standardProps);
-		props.putAll(secureProps);
-		FlinkKafkaConsumerBase<Tuple2<Long, Integer>> source = kafkaServer
-			.getConsumer(topics, sourceSchema, props)
-			.assignTimestampsAndWatermarks(new TestPunctuatedTSExtractor());
-
-		DataStreamSource<Tuple2<Long, Integer>> consuming = env.setParallelism(1).addSource(source);
-
-		return consuming
-			.transform("testingWatermarkOperator", inputTypeInfo, new WMTestingOperator())
-			.addSink(new RichSinkFunction<Tuple2<Long, Integer>>() {
-
-				private int elementCount = 0;
-
-				@Override
-				public void invoke(Tuple2<Long, Integer> value) throws Exception {
-					elementCount++;
-					if (elementCount == totalElementsToExpect) {
-						throw new SuccessException();
-					}
-				}
-
-				@Override
-				public void close() throws Exception {
-					super.close();
-				}
-			});
-	}
-
-	/** An extractor that emits a Watermark whenever the timestamp <b>in the record</b> is equal to {@code -1}. */
-	private static class TestPunctuatedTSExtractor implements AssignerWithPunctuatedWatermarks<Tuple2<Long, Integer>> {
-
-		@Override
-		public Watermark checkAndGetNextWatermark(Tuple2<Long, Integer> lastElement, long extractedTimestamp) {
-			return (lastElement.f0 == -1) ? new Watermark(extractedTimestamp) : null;
-		}
-
-		@Override
-		public long extractTimestamp(Tuple2<Long, Integer> element, long previousElementTimestamp) {
-			return element.f0;
-		}
-	}
-
-	private static class WMTestingOperator extends AbstractStreamOperator<Tuple2<Long, Integer>> implements OneInputStreamOperator<Tuple2<Long, Integer>, Tuple2<Long, Integer>> {
-
-		private long lastReceivedWatermark = Long.MIN_VALUE;
-
-		private Map<Integer, Boolean> isEligible = new HashMap<>();
-		private Map<Integer, Long> perPartitionMaxTs = new HashMap<>();
-
-		WMTestingOperator() {
-			isEligible = new HashMap<>();
-			perPartitionMaxTs = new HashMap<>();
-		}
-
-		@Override
-		public void processElement(StreamRecord<Tuple2<Long, Integer>> element) throws Exception {
-			int partition = element.getValue().f1;
-			Long maxTs = perPartitionMaxTs.get(partition);
-			if(maxTs == null || maxTs < element.getValue().f0) {
-				perPartitionMaxTs.put(partition, element.getValue().f0);
-				isEligible.put(partition, element.getValue().f0 > lastReceivedWatermark);
-			}
-			output.collect(element);
-		}
-
-		@Override
-		public void processWatermark(Watermark mark) throws Exception {
-			int partition = -1;
-			long minTS = Long.MAX_VALUE;
-			for (Integer part : perPartitionMaxTs.keySet()) {
-				Long ts = perPartitionMaxTs.get(part);
-				if (ts < minTS && isEligible.get(part)) {
-					partition = part;
-					minTS = ts;
-					lastReceivedWatermark = ts;
-				}
-			}
-			isEligible.put(partition, false);
-
-			assertEquals(minTS, mark.getTimestamp());
-			output.emitWatermark(mark);
-		}
-
-		@Override
-		public void close() throws Exception {
-			super.close();
-			perPartitionMaxTs.clear();
-			isEligible.clear();
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/744f8ebb/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index 3f035fd..ba75212 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -18,74 +18,35 @@
 
 package org.apache.flink.streaming.connectors.kafka.testutils;
 
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
 import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 
-import java.io.Serializable;
+import java.util.Collection;
 import java.util.Properties;
 import java.util.Random;
 
 @SuppressWarnings("serial")
 public class DataGenerators {
-	
-	public static void generateLongStringTupleSequence(StreamExecutionEnvironment env,
-													   KafkaTestEnvironment testServer, String topic,
-													   int numPartitions,
-													   final int from, final int to) throws Exception {
 
-		TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
-
-		env.setParallelism(numPartitions);
-		env.getConfig().disableSysoutLogging();
-		env.setRestartStrategy(RestartStrategies.noRestart());
-		
-		DataStream<Tuple2<Integer, Integer>> stream =env.addSource(
-				new RichParallelSourceFunction<Tuple2<Integer, Integer>>() {
-
-					private volatile boolean running = true;
-
-					@Override
-					public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
-						int cnt = from;
-						int partition = getRuntimeContext().getIndexOfThisSubtask();
-
-						while (running && cnt <= to) {
-							ctx.collect(new Tuple2<>(partition, cnt));
-							cnt++;
-						}
-					}
-
-					@Override
-					public void cancel() {
-						running = false;
-					}
-				});
-
-		testServer.produceIntoKafka(stream, topic,
-				new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(resultType, env.getConfig())),
-				FlinkKafkaProducerBase.getPropertiesFromBrokerList(testServer.getBrokerConnectionString()),
-				new Tuple2Partitioner(numPartitions)
-		);
-
-		env.execute("Data generator (Int, Int) stream to topic " + topic);
-	}
-
-	// ------------------------------------------------------------------------
-	
 	public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment env,
 														 KafkaTestEnvironment testServer, String topic,
 														 final int numPartitions,
@@ -105,9 +66,9 @@ public class DataGenerators {
 						// create a sequence
 						int[] elements = new int[numElements];
 						for (int i = 0, val = getRuntimeContext().getIndexOfThisSubtask();
-								i < numElements;
-								i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) {
-							
+							 i < numElements;
+							 i++, val += getRuntimeContext().getNumberOfParallelSubtasks()) {
+
 							elements[i] = val;
 						}
 
@@ -116,7 +77,7 @@ public class DataGenerators {
 							Random rnd = new Random();
 							for (int i = 0; i < elements.length; i++) {
 								int otherPos = rnd.nextInt(elements.length);
-								
+
 								int tmp = elements[i];
 								elements[i] = elements[otherPos];
 								elements[otherPos] = tmp;
@@ -142,7 +103,7 @@ public class DataGenerators {
 		if(secureProps != null) {
 			props.putAll(testServer.getSecureProperties());
 		}
-		
+
 		stream = stream.rebalance();
 		testServer.produceIntoKafka(stream, topic,
 				new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig())),
@@ -156,63 +117,55 @@ public class DataGenerators {
 
 		env.execute("Scrambles int sequence generator");
 	}
-	
+
 	// ------------------------------------------------------------------------
-	
-	public static class InfiniteStringsGenerator extends Thread implements Serializable {
 
-		private transient KafkaTestEnvironment server;
-		
-		private final String topic;
+	public static class InfiniteStringsGenerator extends Thread {
 
-		private final int flinkPort;
+		private final KafkaTestEnvironment server;
+
+		private final String topic;
 
 		private volatile Throwable error;
-		
+
 		private volatile boolean running = true;
 
-		
-		public InfiniteStringsGenerator(KafkaTestEnvironment server, String topic, int flinkPort) {
+
+		public InfiniteStringsGenerator(KafkaTestEnvironment server, String topic) {
 			this.server = server;
 			this.topic = topic;
-			this.flinkPort = flinkPort;
 		}
 
 		@Override
 		public void run() {
 			// we manually feed data into the Kafka sink
-			FlinkKafkaProducerBase<String> producer = null;
+			RichFunction producer = null;
 			try {
-				final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-				DataStream<String> stream = env.addSource(new SourceFunction<String>() {
-					@Override
-					public void run(SourceContext<String> ctx) throws Exception {
-						final StringBuilder bld = new StringBuilder();
-						final Random rnd = new Random();
-						while (running) {
-							bld.setLength(0);
-							int len = rnd.nextInt(100) + 1;
-							for (int i = 0; i < len; i++) {
-								bld.append((char) (rnd.nextInt(20) + 'a'));
-							}
+				Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(server.getBrokerConnectionString());
+				producerProperties.setProperty("retries", "3");
+				StreamTransformation<String> mockTransform = new MockStreamTransformation();
+				DataStream<String> stream = new DataStream<>(new DummyStreamExecutionEnvironment(), mockTransform);
+				DataStreamSink<String> sink = server.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
+						producerProperties, new FixedPartitioner<String>());
+				StreamSink<String> producerOperator = sink.getTransformation().getOperator();
+				producer = (RichFunction) producerOperator.getUserFunction();
+				producer.setRuntimeContext(new MockRuntimeContext(1,0));
+				producer.open(new Configuration());
 
-							String next = bld.toString();
-							ctx.collect(next);
-						}
-					}
+				final StringBuilder bld = new StringBuilder();
+				final Random rnd = new Random();
 
-					@Override
-					public void cancel() {
-						running = false;
+				while (running) {
+					bld.setLength(0);
+
+					int len = rnd.nextInt(100) + 1;
+					for (int i = 0; i < len; i++) {
+						bld.append((char) (rnd.nextInt(20) + 'a') );
 					}
-				});
 
-				Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(server.getBrokerConnectionString());
-				producerProperties.setProperty("retries", "3");
-				server.produceIntoKafka(stream, topic,
-						new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
-						producerProperties, new FixedPartitioner<String>());
-				env.execute("String generator");
+					String next = bld.toString();
+					producerOperator.processElement(new StreamRecord<>(next));
+				}
 			}
 			catch (Throwable t) {
 				this.error = t;
@@ -228,14 +181,38 @@ public class DataGenerators {
 				}
 			}
 		}
-		
+
 		public void shutdown() {
 			this.running = false;
 			this.interrupt();
 		}
-		
+
 		public Throwable getError() {
 			return this.error;
 		}
+
+		private static class MockStreamTransformation extends StreamTransformation<String> {
+			public MockStreamTransformation() {
+				super("MockTransform", TypeInfoParser.<String>parse("String"), 1);
+			}
+
+			@Override
+			public void setChainingStrategy(ChainingStrategy strategy) {
+
+			}
+
+			@Override
+			public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+				return null;
+			}
+		}
+
+		public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
+
+			@Override
+			public JobExecutionResult execute(String jobName) throws Exception {
+				return null;
+			}
+		}
 	}
-}
+}
\ No newline at end of file


[2/2] flink git commit: [hotfix][kafka] Backport Kafka09FetcherTest for Kafka010

Posted by rm...@apache.org.
[hotfix][kafka] Backport Kafka09FetcherTest for Kafka010

This closes #2627


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d5bca0a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d5bca0a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d5bca0a

Branch: refs/heads/master
Commit: 3d5bca0abcb6071c2eedfedce36b883a6f605687
Parents: 744f8eb
Author: Robert Metzger <rm...@apache.org>
Authored: Wed Oct 12 14:03:01 2016 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Oct 13 10:23:37 2016 +0200

----------------------------------------------------------------------
 .../connectors/kafka/Kafka010FetcherTest.java   | 300 +++++++++++++++++++
 1 file changed, 300 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3d5bca0a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
new file mode 100644
index 0000000..8f0b170
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the {@link Kafka010Fetcher}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(Kafka010Fetcher.class)
+public class Kafka010FetcherTest {
+
+    @Test
+    public void testCommitDoesNotBlock() throws Exception {
+
+        // test data
+        final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42);
+        final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>();
+        testCommitData.put(testPartition, 11L);
+
+        // to synchronize when the consumer is in its blocking method
+        final OneShotLatch sync = new OneShotLatch();
+
+        // ----- the mock consumer with blocking poll calls ----
+        final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+        KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+        when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+
+            @Override
+            public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
+                sync.trigger();
+                blockerLatch.await();
+                return ConsumerRecords.empty();
+            }
+        });
+
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) {
+                blockerLatch.trigger();
+                return null;
+            }
+        }).when(mockConsumer).wakeup();
+
+        // make sure the fetcher creates the mock consumer
+        whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+        // ----- create the test fetcher -----
+
+        @SuppressWarnings("unchecked")
+        SourceContext<String> sourceContext = mock(SourceContext.class);
+        List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+        KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+        StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
+
+        final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+                sourceContext, topics, null, null, context, schema, new Properties(), 0L, false);
+
+        // ----- run the fetcher -----
+
+        final AtomicReference<Throwable> error = new AtomicReference<>();
+        final Thread fetcherRunner = new Thread("fetcher runner") {
+
+            @Override
+            public void run() {
+                try {
+                    fetcher.runFetchLoop();
+                } catch (Throwable t) {
+                    error.set(t);
+                }
+            }
+        };
+        fetcherRunner.start();
+
+        // wait until the fetcher has reached the method of interest
+        sync.await();
+
+        // ----- trigger the offset commit -----
+
+        final AtomicReference<Throwable> commitError = new AtomicReference<>();
+        final Thread committer = new Thread("committer runner") {
+            @Override
+            public void run() {
+                try {
+                    fetcher.commitSpecificOffsetsToKafka(testCommitData);
+                } catch (Throwable t) {
+                    commitError.set(t);
+                }
+            }
+        };
+        committer.start();
+
+        // ----- ensure that the committer finishes in time  -----
+        committer.join(30000);
+        assertFalse("The committer did not finish in time", committer.isAlive());
+
+        // ----- test done, wait till the fetcher is done for a clean shutdown -----
+        fetcher.cancel();
+        fetcherRunner.join();
+
+        // check that there were no errors in the fetcher
+        final Throwable caughtError = error.get();
+        if (caughtError != null) {
+            throw new Exception("Exception in the fetcher", caughtError);
+        }
+    }
+
+    @Test
+    public void ensureOffsetsGetCommitted() throws Exception {
+
+        // test data
+        final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42);
+        final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99);
+
+        final Map<KafkaTopicPartition, Long> testCommitData1 = new HashMap<>();
+        testCommitData1.put(testPartition1, 11L);
+        testCommitData1.put(testPartition2, 18L);
+
+        final Map<KafkaTopicPartition, Long> testCommitData2 = new HashMap<>();
+        testCommitData2.put(testPartition1, 19L);
+        testCommitData2.put(testPartition2, 28L);
+
+        final BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitStore = new LinkedBlockingQueue<>();
+
+
+        // ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ----
+
+        final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+        KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class);
+
+        when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() {
+            @Override
+            public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) throws InterruptedException {
+                blockerLatch.await();
+                return ConsumerRecords.empty();
+            }
+        });
+
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) {
+                blockerLatch.trigger();
+                return null;
+            }
+        }).when(mockConsumer).wakeup();
+
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) {
+                @SuppressWarnings("unchecked")
+                Map<TopicPartition, OffsetAndMetadata> offsets =
+                        (Map<TopicPartition, OffsetAndMetadata>) invocation.getArguments()[0];
+
+                OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1];
+
+                commitStore.add(offsets);
+                callback.onComplete(offsets, null);
+
+                return null;
+            }
+        }).when(mockConsumer).commitAsync(
+                Mockito.<Map<TopicPartition, OffsetAndMetadata>>any(), any(OffsetCommitCallback.class));
+
+        // make sure the fetcher creates the mock consumer
+        whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+        // ----- create the test fetcher -----
+
+        @SuppressWarnings("unchecked")
+        SourceContext<String> sourceContext = mock(SourceContext.class);
+        List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
+        KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
+        StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
+
+        final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
+                sourceContext, topics, null, null, context, schema, new Properties(), 0L, false);
+
+        // ----- run the fetcher -----
+
+        final AtomicReference<Throwable> error = new AtomicReference<>();
+        final Thread fetcherRunner = new Thread("fetcher runner") {
+
+            @Override
+            public void run() {
+                try {
+                    fetcher.runFetchLoop();
+                } catch (Throwable t) {
+                    error.set(t);
+                }
+            }
+        };
+        fetcherRunner.start();
+
+        // ----- trigger the first offset commit -----
+
+        fetcher.commitSpecificOffsetsToKafka(testCommitData1);
+        Map<TopicPartition, OffsetAndMetadata> result1 = commitStore.take();
+
+        for (Entry<TopicPartition, OffsetAndMetadata> entry : result1.entrySet()) {
+            TopicPartition partition = entry.getKey();
+            if (partition.topic().equals("test")) {
+                assertEquals(42, partition.partition());
+                assertEquals(12L, entry.getValue().offset());
+            }
+            else if (partition.topic().equals("another")) {
+                assertEquals(99, partition.partition());
+                assertEquals(18L, entry.getValue().offset());
+            }
+        }
+
+        // ----- trigger the second offset commit -----
+
+        fetcher.commitSpecificOffsetsToKafka(testCommitData2);
+        Map<TopicPartition, OffsetAndMetadata> result2 = commitStore.take();
+
+        for (Entry<TopicPartition, OffsetAndMetadata> entry : result2.entrySet()) {
+            TopicPartition partition = entry.getKey();
+            if (partition.topic().equals("test")) {
+                assertEquals(42, partition.partition());
+                assertEquals(20L, entry.getValue().offset());
+            }
+            else if (partition.topic().equals("another")) {
+                assertEquals(99, partition.partition());
+                assertEquals(28L, entry.getValue().offset());
+            }
+        }
+
+        // ----- test done, wait till the fetcher is done for a clean shutdown -----
+        fetcher.cancel();
+        fetcherRunner.join();
+
+        // check that there were no errors in the fetcher
+        final Throwable caughtError = error.get();
+        if (caughtError != null) {
+            throw new Exception("Exception in the fetcher", caughtError);
+        }
+    }
+}
\ No newline at end of file