You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/10/05 12:37:23 UTC
flink git commit: [FLINK-3874] Rewrite Kafka JSON Table sink tests
Repository: flink
Updated Branches:
refs/heads/master efb40cfc5 -> 825ef3be3
[FLINK-3874] Rewrite Kafka JSON Table sink tests
This closes #2430.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/825ef3be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/825ef3be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/825ef3be
Branch: refs/heads/master
Commit: 825ef3be35ec9a85e800c5db5b8d3bbf5fa188a0
Parents: efb40cf
Author: Ivan Mushketyk <iv...@gmail.com>
Authored: Sat Aug 27 23:24:21 2016 +0100
Committer: twalthr <tw...@apache.org>
Committed: Wed Oct 5 14:36:17 2016 +0200
----------------------------------------------------------------------
.../connectors/kafka/Kafka08JsonTableSink.java | 1 +
.../kafka/Kafka08JsonTableSinkITCase.java | 40 -----
.../kafka/Kafka08JsonTableSinkTest.java | 48 ++++++
.../kafka/Kafka09JsonTableSinkITCase.java | 39 -----
.../kafka/Kafka09JsonTableSinkTest.java | 47 ++++++
.../connectors/kafka/KafkaTableSink.java | 1 +
.../kafka/KafkaTableSinkTestBase.java | 146 +++++++------------
7 files changed, 153 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/825ef3be/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index 5f869ec..b155576 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -27,6 +27,7 @@ import java.util.Properties;
* Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
*/
public class Kafka08JsonTableSink extends KafkaJsonTableSink {
+
/**
* Creates {@link KafkaTableSink} for Kafka 0.8
*
http://git-wip-us.apache.org/repos/asf/flink/blob/825ef3be/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java
deleted file mode 100644
index f870adf..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
-
-public class Kafka08JsonTableSinkITCase extends KafkaTableSinkTestBase {
-
- @Override
- protected KafkaTableSink createTableSink() {
- Kafka08JsonTableSink sink = new Kafka08JsonTableSink(
- TOPIC,
- createSinkProperties(),
- createPartitioner());
- return sink.configure(FIELD_NAMES, FIELD_TYPES);
- }
-
- protected DeserializationSchema<Row> createRowDeserializationSchema() {
- return new JsonRowDeserializationSchema(
- FIELD_NAMES, FIELD_TYPES);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/825ef3be/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
new file mode 100644
index 0000000..b1e6db9
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
+
+ @Override
+ protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
+ final FlinkKafkaProducerBase<Row> kafkaProducer) {
+
+ return new Kafka08JsonTableSink(topic, properties, partitioner) {
+ @Override
+ protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
+ SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
+ return kafkaProducer;
+ }
+ };
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected Class<SerializationSchema<Row>> getSerializationSchema() {
+ return (Class) JsonRowSerializationSchema.class;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/825ef3be/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java
deleted file mode 100644
index 74415f8..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.table.Row;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
-
-public class Kafka09JsonTableSinkITCase extends KafkaTableSinkTestBase {
-
- @Override
- protected KafkaTableSink createTableSink() {
- Kafka09JsonTableSink sink = new Kafka09JsonTableSink(
- TOPIC,
- createSinkProperties(),
- createPartitioner());
- return sink.configure(FIELD_NAMES, FIELD_TYPES);
- }
-
- protected DeserializationSchema<Row> createRowDeserializationSchema() {
- return new JsonRowDeserializationSchema(
- FIELD_NAMES, FIELD_TYPES);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/825ef3be/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
new file mode 100644
index 0000000..bfdcf68
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.util.Properties;
+
+public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
+
+ @Override
+ protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
+ final FlinkKafkaProducerBase<Row> kafkaProducer) {
+ return new Kafka09JsonTableSink(topic, properties, partitioner) {
+ @Override
+ protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
+ SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
+ return kafkaProducer;
+ }
+ };
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected Class<SerializationSchema<Row>> getSerializationSchema() {
+ return (Class) JsonRowSerializationSchema.class;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/825ef3be/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 8f5e811..714d9cd 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -42,6 +42,7 @@ public abstract class KafkaTableSink implements StreamTableSink<Row> {
protected final KafkaPartitioner<Row> partitioner;
protected String[] fieldNames;
protected TypeInformation[] fieldTypes;
+
/**
* Creates KafkaTableSink
*
http://git-wip-us.apache.org/repos/asf/flink/blob/825ef3be/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index 5e55b0a..e46ca08 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -17,123 +17,89 @@
*/
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.junit.Test;
import java.io.Serializable;
-import java.util.HashSet;
import java.util.Properties;
-import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
-public abstract class KafkaTableSinkTestBase extends KafkaTestBase implements Serializable {
+public abstract class KafkaTableSinkTestBase implements Serializable {
- protected final static String TOPIC = "customPartitioningTestTopic";
- protected final static int PARALLELISM = 1;
- protected final static String[] FIELD_NAMES = new String[] {"field1", "field2"};
- protected final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
+ private final static String TOPIC = "testTopic";
+ private final static String[] FIELD_NAMES = new String[] {"field1", "field2"};
+ private final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
+
+ private final KafkaPartitioner<Row> partitioner = new CustomPartitioner();
+ private final Properties properties = createSinkProperties();
+ @SuppressWarnings("unchecked")
+ private final FlinkKafkaProducerBase<Row> kafkaProducer = mock(FlinkKafkaProducerBase.class);
@Test
+ @SuppressWarnings("unchecked")
public void testKafkaTableSink() throws Exception {
- LOG.info("Starting KafkaTableSinkTestBase.testKafkaTableSink()");
-
- createTestTopic(TOPIC, PARALLELISM, 1);
- StreamExecutionEnvironment env = createEnvironment();
-
- createProducingTopology(env);
- createConsumingTopology(env);
+ DataStream dataStream = mock(DataStream.class);
+ KafkaTableSink kafkaTableSink = createTableSink();
+ kafkaTableSink.emitDataStream(dataStream);
- tryExecute(env, "custom partitioning test");
- deleteTestTopic(TOPIC);
- LOG.info("Finished KafkaTableSinkTestBase.testKafkaTableSink()");
+ verify(dataStream).addSink(kafkaProducer);
}
- private StreamExecutionEnvironment createEnvironment() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setRestartStrategy(RestartStrategies.noRestart());
- env.getConfig().disableSysoutLogging();
- return env;
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testCreatedProducer() throws Exception {
+ DataStream dataStream = mock(DataStream.class);
+ KafkaTableSink kafkaTableSink = spy(createTableSink());
+ kafkaTableSink.emitDataStream(dataStream);
+
+ verify(kafkaTableSink).createKafkaProducer(
+ eq(TOPIC),
+ eq(properties),
+ any(getSerializationSchema()),
+ eq(partitioner));
}
- private void createProducingTopology(StreamExecutionEnvironment env) {
- DataStream<Row> stream = env.addSource(new SourceFunction<Row>() {
- private boolean running = true;
-
- @Override
- public void run(SourceContext<Row> ctx) throws Exception {
- long cnt = 0;
- while (running) {
- Row row = new Row(2);
- row.setField(0, cnt);
- row.setField(1, "kafka-" + cnt);
- ctx.collect(row);
- cnt++;
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- })
- .setParallelism(1);
-
- KafkaTableSink kafkaTableSinkBase = createTableSink();
-
- kafkaTableSinkBase.emitDataStream(stream);
+ @Test
+ public void testConfiguration() {
+ KafkaTableSink kafkaTableSink = createTableSink();
+ KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
+ assertNotSame(kafkaTableSink, newKafkaTableSink);
+
+ assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames());
+ assertArrayEquals(FIELD_TYPES, newKafkaTableSink.getFieldTypes());
+ assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
}
- private void createConsumingTopology(StreamExecutionEnvironment env) {
- DeserializationSchema<Row> deserializationSchema = createRowDeserializationSchema();
-
- FlinkKafkaConsumerBase<Row> source = kafkaServer.getConsumer(TOPIC, deserializationSchema, standardProps);
-
- env.addSource(source).setParallelism(PARALLELISM)
- .map(new RichMapFunction<Row, Integer>() {
- @Override
- public Integer map(Row value) {
- return (Integer) value.productElement(0);
- }
- }).setParallelism(PARALLELISM)
-
- .addSink(new SinkFunction<Integer>() {
- HashSet<Integer> ids = new HashSet<>();
- @Override
- public void invoke(Integer value) throws Exception {
- ids.add(value);
-
- if (ids.size() == 100) {
- throw new SuccessException();
- }
- }
- }).setParallelism(1);
- }
+ protected abstract KafkaTableSink createTableSink(String topic, Properties properties,
+ KafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
- protected KafkaPartitioner<Row> createPartitioner() {
- return new CustomPartitioner();
- }
+ protected abstract Class<SerializationSchema<Row>> getSerializationSchema();
- protected Properties createSinkProperties() {
- return FlinkKafkaProducerBase.getPropertiesFromBrokerList(KafkaTestBase.brokerConnectionStrings);
+ private KafkaTableSink createTableSink() {
+ return createTableSink(TOPIC, properties, partitioner, kafkaProducer);
}
- protected abstract KafkaTableSink createTableSink();
-
- protected abstract DeserializationSchema<Row> createRowDeserializationSchema();
-
+ private static Properties createSinkProperties() {
+ Properties properties = new Properties();
+ properties.setProperty("testKey", "testValue");
+ return properties;
+ }
- public static class CustomPartitioner extends KafkaPartitioner<Row> implements Serializable {
+ private static class CustomPartitioner extends KafkaPartitioner<Row> implements Serializable {
@Override
public int partition(Row next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
return 0;