You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/10/05 12:37:23 UTC

flink git commit: [FLINK-3874] Rewrite Kafka JSON Table sink tests

Repository: flink
Updated Branches:
  refs/heads/master efb40cfc5 -> 825ef3be3


[FLINK-3874] Rewrite Kafka JSON Table sink tests

This closes #2430.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/825ef3be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/825ef3be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/825ef3be

Branch: refs/heads/master
Commit: 825ef3be35ec9a85e800c5db5b8d3bbf5fa188a0
Parents: efb40cf
Author: Ivan Mushketyk <iv...@gmail.com>
Authored: Sat Aug 27 23:24:21 2016 +0100
Committer: twalthr <tw...@apache.org>
Committed: Wed Oct 5 14:36:17 2016 +0200

----------------------------------------------------------------------
 .../connectors/kafka/Kafka08JsonTableSink.java  |   1 +
 .../kafka/Kafka08JsonTableSinkITCase.java       |  40 -----
 .../kafka/Kafka08JsonTableSinkTest.java         |  48 ++++++
 .../kafka/Kafka09JsonTableSinkITCase.java       |  39 -----
 .../kafka/Kafka09JsonTableSinkTest.java         |  47 ++++++
 .../connectors/kafka/KafkaTableSink.java        |   1 +
 .../kafka/KafkaTableSinkTestBase.java           | 146 +++++++------------
 7 files changed, 153 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/825ef3be/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index 5f869ec..b155576 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -27,6 +27,7 @@ import java.util.Properties;
  * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
  */
 public class Kafka08JsonTableSink extends KafkaJsonTableSink {
+
 	/**
 	 * Creates {@link KafkaTableSink} for Kafka 0.8
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/825ef3be/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java
deleted file mode 100644
index f870adf..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
-
-public class Kafka08JsonTableSinkITCase extends KafkaTableSinkTestBase {
-
-	@Override
-	protected KafkaTableSink createTableSink() {
-		Kafka08JsonTableSink sink = new Kafka08JsonTableSink(
-			TOPIC,
-			createSinkProperties(),
-			createPartitioner());
-		return sink.configure(FIELD_NAMES, FIELD_TYPES);
-	}
-
-	protected DeserializationSchema<Row> createRowDeserializationSchema() {
-		return new JsonRowDeserializationSchema(
-			FIELD_NAMES, FIELD_TYPES);
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/825ef3be/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
new file mode 100644
index 0000000..b1e6db9
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
+
+	@Override
+	protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
+				final FlinkKafkaProducerBase<Row> kafkaProducer) {
+
+		return new Kafka08JsonTableSink(topic, properties, partitioner) {
+			@Override
+			protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
+					SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
+				return kafkaProducer;
+			}
+		};
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected Class<SerializationSchema<Row>> getSerializationSchema() {
+		return (Class) JsonRowSerializationSchema.class;
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/825ef3be/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java
deleted file mode 100644
index 74415f8..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
-
-public class Kafka09JsonTableSinkITCase extends KafkaTableSinkTestBase {
-
-	@Override
-	protected KafkaTableSink createTableSink() {
-		Kafka09JsonTableSink sink = new Kafka09JsonTableSink(
-			TOPIC,
-			createSinkProperties(),
-			createPartitioner());
-		return sink.configure(FIELD_NAMES, FIELD_TYPES);
-	}
-
-	protected DeserializationSchema<Row> createRowDeserializationSchema() {
-		return new JsonRowDeserializationSchema(
-			FIELD_NAMES, FIELD_TYPES);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/825ef3be/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
new file mode 100644
index 0000000..bfdcf68
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
+
+	@Override
+	protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
+			final FlinkKafkaProducerBase<Row> kafkaProducer) {
+		return new Kafka09JsonTableSink(topic, properties, partitioner) {
+			@Override
+			protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
+					SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
+				return kafkaProducer;
+			}
+		};
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected Class<SerializationSchema<Row>> getSerializationSchema() {
+		return (Class) JsonRowSerializationSchema.class;
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/825ef3be/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 8f5e811..714d9cd 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -42,6 +42,7 @@ public abstract class KafkaTableSink implements StreamTableSink<Row> {
 	protected final KafkaPartitioner<Row> partitioner;
 	protected String[] fieldNames;
 	protected TypeInformation[] fieldTypes;
+
 	/**
 	 * Creates KafkaTableSink
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/825ef3be/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index 5e55b0a..e46ca08 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -17,123 +17,89 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.junit.Test;
 
 import java.io.Serializable;
-import java.util.HashSet;
 import java.util.Properties;
 
-import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
-public abstract class KafkaTableSinkTestBase extends KafkaTestBase implements Serializable {
+public abstract class KafkaTableSinkTestBase implements Serializable {
 
-	protected final static String TOPIC = "customPartitioningTestTopic";
-	protected final static int PARALLELISM = 1;
-	protected final static String[] FIELD_NAMES = new String[] {"field1", "field2"};
-	protected final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
+	private final static String TOPIC = "testTopic";
+	private final static String[] FIELD_NAMES = new String[] {"field1", "field2"};
+	private final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
+
+	private final KafkaPartitioner<Row> partitioner = new CustomPartitioner();
+	private final Properties properties = createSinkProperties();
+	@SuppressWarnings("unchecked")
+	private final FlinkKafkaProducerBase<Row> kafkaProducer = mock(FlinkKafkaProducerBase.class);
 
 	@Test
+	@SuppressWarnings("unchecked")
 	public void testKafkaTableSink() throws Exception {
-		LOG.info("Starting KafkaTableSinkTestBase.testKafkaTableSink()");
-
-		createTestTopic(TOPIC, PARALLELISM, 1);
-		StreamExecutionEnvironment env = createEnvironment();
-
-		createProducingTopology(env);
-		createConsumingTopology(env);
+		DataStream dataStream = mock(DataStream.class);
+		KafkaTableSink kafkaTableSink = createTableSink();
+		kafkaTableSink.emitDataStream(dataStream);
 
-		tryExecute(env, "custom partitioning test");
-		deleteTestTopic(TOPIC);
-		LOG.info("Finished KafkaTableSinkTestBase.testKafkaTableSink()");
+		verify(dataStream).addSink(kafkaProducer);
 	}
 
-	private StreamExecutionEnvironment createEnvironment() {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
-		env.setRestartStrategy(RestartStrategies.noRestart());
-		env.getConfig().disableSysoutLogging();
-		return env;
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testCreatedProducer() throws Exception {
+		DataStream dataStream = mock(DataStream.class);
+		KafkaTableSink kafkaTableSink = spy(createTableSink());
+		kafkaTableSink.emitDataStream(dataStream);
+
+		verify(kafkaTableSink).createKafkaProducer(
+			eq(TOPIC),
+			eq(properties),
+			any(getSerializationSchema()),
+			eq(partitioner));
 	}
 
-	private void createProducingTopology(StreamExecutionEnvironment env) {
-		DataStream<Row> stream = env.addSource(new SourceFunction<Row>() {
-			private boolean running = true;
-
-			@Override
-			public void run(SourceContext<Row> ctx) throws Exception {
-				long cnt = 0;
-				while (running) {
-					Row row = new Row(2);
-					row.setField(0, cnt);
-					row.setField(1, "kafka-" + cnt);
-					ctx.collect(row);
-					cnt++;
-				}
-			}
-
-			@Override
-			public void cancel() {
-				running = false;
-			}
-		})
-		.setParallelism(1);
-
-		KafkaTableSink kafkaTableSinkBase = createTableSink();
-
-		kafkaTableSinkBase.emitDataStream(stream);
+	@Test
+	public void testConfiguration() {
+		KafkaTableSink kafkaTableSink = createTableSink();
+		KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
+		assertNotSame(kafkaTableSink, newKafkaTableSink);
+
+		assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames());
+		assertArrayEquals(FIELD_TYPES, newKafkaTableSink.getFieldTypes());
+		assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
 	}
 
-	private void createConsumingTopology(StreamExecutionEnvironment env) {
-		DeserializationSchema<Row> deserializationSchema = createRowDeserializationSchema();
-
-		FlinkKafkaConsumerBase<Row> source = kafkaServer.getConsumer(TOPIC, deserializationSchema, standardProps);
-
-		env.addSource(source).setParallelism(PARALLELISM)
-			.map(new RichMapFunction<Row, Integer>() {
-				@Override
-				public Integer map(Row value) {
-					return (Integer) value.productElement(0);
-				}
-			}).setParallelism(PARALLELISM)
-
-			.addSink(new SinkFunction<Integer>() {
-				HashSet<Integer> ids = new HashSet<>();
-				@Override
-				public void invoke(Integer value) throws Exception {
-					ids.add(value);
-
-					if (ids.size() == 100) {
-						throw new SuccessException();
-					}
-				}
-			}).setParallelism(1);
-	}
+	protected abstract KafkaTableSink createTableSink(String topic, Properties properties,
+			KafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
 
-	protected KafkaPartitioner<Row> createPartitioner() {
-		return new CustomPartitioner();
-	}
+	protected abstract Class<SerializationSchema<Row>> getSerializationSchema();
 
-	protected Properties createSinkProperties() {
-		return FlinkKafkaProducerBase.getPropertiesFromBrokerList(KafkaTestBase.brokerConnectionStrings);
+	private KafkaTableSink createTableSink() {
+		return createTableSink(TOPIC, properties, partitioner, kafkaProducer);
 	}
 
-	protected abstract KafkaTableSink createTableSink();
-
-	protected abstract DeserializationSchema<Row> createRowDeserializationSchema();
-
+	private static Properties createSinkProperties() {
+		Properties properties = new Properties();
+		properties.setProperty("testKey", "testValue");
+		return properties;
+	}
 
-	public static class CustomPartitioner extends KafkaPartitioner<Row> implements Serializable {
+	private static class CustomPartitioner extends KafkaPartitioner<Row> implements Serializable {
 		@Override
 		public int partition(Row next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
 			return 0;