You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/10/10 09:25:12 UTC
flink git commit: [FLINK-4745] [table] Convert KafkaTableSource test
to unit tests
Repository: flink
Updated Branches:
refs/heads/master b949d42d9 -> 5e30ba384
[FLINK-4745] [table] Convert KafkaTableSource test to unit tests
This closes #2603.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5e30ba38
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5e30ba38
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5e30ba38
Branch: refs/heads/master
Commit: 5e30ba384934b08861a970744db64b5123cfeff8
Parents: b949d42
Author: twalthr <tw...@apache.org>
Authored: Wed Oct 5 16:55:20 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Mon Oct 10 11:22:19 2016 +0200
----------------------------------------------------------------------
.../connectors/kafka/Kafka08ITCase.java | 65 -------------
.../kafka/Kafka08JsonTableSinkTest.java | 2 +-
.../kafka/Kafka08JsonTableSourceTest.java | 45 +++++++++
.../connectors/kafka/Kafka09ITCase.java | 79 ----------------
.../kafka/Kafka09JsonTableSinkTest.java | 1 +
.../kafka/Kafka09JsonTableSourceTest.java | 45 +++++++++
.../connectors/kafka/KafkaConsumerTestBase.java | 98 --------------------
.../kafka/KafkaTableSinkTestBase.java | 38 +++-----
.../kafka/KafkaTableSourceTestBase.java | 77 +++++++++++++++
9 files changed, 184 insertions(+), 266 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 467ccc5..1c69d78 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -19,8 +19,6 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.curator.framework.CuratorFramework;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -33,7 +31,6 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.Properties;
-import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
@@ -368,66 +365,4 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
curatorFramework.close();
}
-
- @Test
- public void testJsonTableSource() throws Exception {
- String topic = UUID.randomUUID().toString();
-
- // Names and types are determined in the actual test method of the
- // base test class.
- Kafka08JsonTableSource tableSource = new Kafka08JsonTableSource(
- topic,
- standardProps,
- new String[] {
- "long",
- "string",
- "boolean",
- "double",
- "missing-field"},
- new TypeInformation<?>[] {
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.BOOLEAN_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO });
-
- // Don't fail on missing field, but set to null (default)
- tableSource.setFailOnMissingField(false);
-
- runJsonTableSource(topic, tableSource);
- }
-
- @Test
- public void testJsonTableSourceWithFailOnMissingField() throws Exception {
- String topic = UUID.randomUUID().toString();
-
- // Names and types are determined in the actual test method of the
- // base test class.
- Kafka08JsonTableSource tableSource = new Kafka08JsonTableSource(
- topic,
- standardProps,
- new String[] {
- "long",
- "string",
- "boolean",
- "double",
- "missing-field"},
- new TypeInformation<?>[] {
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.BOOLEAN_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO });
-
- // Don't fail on missing field, but set to null (default)
- tableSource.setFailOnMissingField(true);
-
- try {
- runJsonTableSource(topic, tableSource);
- fail("Did not throw expected Exception");
- } catch (Exception e) {
- Throwable rootCause = e.getCause().getCause().getCause();
- assertTrue("Unexpected root cause", rootCause instanceof IllegalStateException);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index b1e6db9..446e1d7 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -28,7 +28,7 @@ public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
@Override
protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
- final FlinkKafkaProducerBase<Row> kafkaProducer) {
+ final FlinkKafkaProducerBase<Row> kafkaProducer) {
return new Kafka08JsonTableSink(topic, properties, partitioner) {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
new file mode 100644
index 0000000..a2d66ac
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+
+public class Kafka08JsonTableSourceTest extends KafkaTableSourceTestBase {
+
+ @Override
+ protected KafkaTableSource createTableSource(String topic, Properties properties, String[] fieldNames, TypeInformation<?>[] typeInfo) {
+ return new Kafka08JsonTableSource(topic, properties, fieldNames, typeInfo);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
+ return (Class) JsonRowDeserializationSchema.class;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
+ return (Class) FlinkKafkaConsumer08.class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index 16ddcdc..fd167a0 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -17,16 +17,8 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.junit.Test;
-import java.util.Properties;
-import java.util.UUID;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
public class Kafka09ITCase extends KafkaConsumerTestBase {
// ------------------------------------------------------------------------
@@ -126,75 +118,4 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
public void testMetrics() throws Throwable {
runMetricsTest();
}
-
- @Test
- public void testJsonTableSource() throws Exception {
- String topic = UUID.randomUUID().toString();
-
- Properties props = new Properties();
- props.putAll(standardProps);
- props.putAll(secureProps);
-
- // Names and types are determined in the actual test method of the
- // base test class.
- Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource(
- topic,
- props,
- new String[] {
- "long",
- "string",
- "boolean",
- "double",
- "missing-field"},
- new TypeInformation<?>[] {
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.BOOLEAN_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO });
-
- // Don't fail on missing field, but set to null (default)
- tableSource.setFailOnMissingField(false);
-
- runJsonTableSource(topic, tableSource);
- }
-
- @Test
- public void testJsonTableSourceWithFailOnMissingField() throws Exception {
- String topic = UUID.randomUUID().toString();
-
- Properties props = new Properties();
- props.putAll(standardProps);
- props.putAll(secureProps);
-
- // Names and types are determined in the actual test method of the
- // base test class.
- Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource(
- topic,
- props,
- new String[] {
- "long",
- "string",
- "boolean",
- "double",
- "missing-field"},
- new TypeInformation<?>[] {
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.BOOLEAN_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO });
-
- // Don't fail on missing field, but set to null (default)
- tableSource.setFailOnMissingField(true);
-
- try {
- runJsonTableSource(topic, tableSource);
- fail("Did not throw expected Exception");
- } catch (Exception e) {
- Throwable rootCause = e.getCause().getCause().getCause();
- assertTrue("Unexpected root cause", rootCause instanceof IllegalStateException);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index bfdcf68..068640d 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -29,6 +29,7 @@ public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
@Override
protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
final FlinkKafkaProducerBase<Row> kafkaProducer) {
+
return new Kafka09JsonTableSink(topic, properties, partitioner) {
@Override
protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
new file mode 100644
index 0000000..4a75f50
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+
+public class Kafka09JsonTableSourceTest extends KafkaTableSourceTestBase {
+
+ @Override
+ protected KafkaTableSource createTableSource(String topic, Properties properties, String[] fieldNames, TypeInformation<?>[] typeInfo) {
+ return new Kafka09JsonTableSource(topic, properties, fieldNames, typeInfo);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
+ return (Class) JsonRowDeserializationSchema.class;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
+ return (Class) FlinkKafkaConsumer09.class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 9c36b43..bafff4f 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -787,104 +787,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
/**
- * Runs a table source test with JSON data.
- *
- * The table source needs to parse the following JSON fields:
- * - "long" -> number
- * - "string" -> "string"
- * - "boolean" -> true|false
- * - "double" -> fraction
- */
- public void runJsonTableSource(String topic, KafkaTableSource kafkaTableSource) throws Exception {
- final ObjectMapper mapper = new ObjectMapper();
-
- final int numElements = 1024;
- final long[] longs = new long[numElements];
- final String[] strings = new String[numElements];
- final boolean[] booleans = new boolean[numElements];
- final double[] doubles = new double[numElements];
-
- final byte[][] serializedJson = new byte[numElements][];
-
- ThreadLocalRandom random = ThreadLocalRandom.current();
-
- for (int i = 0; i < numElements; i++) {
- longs[i] = random.nextLong();
- strings[i] = Integer.toHexString(random.nextInt());
- booleans[i] = random.nextBoolean();
- doubles[i] = random.nextDouble();
-
- ObjectNode entry = mapper.createObjectNode();
- entry.put("long", longs[i]);
- entry.put("string", strings[i]);
- entry.put("boolean", booleans[i]);
- entry.put("double", doubles[i]);
-
- serializedJson[i] = mapper.writeValueAsBytes(entry);
- }
-
- // Produce serialized JSON data
- createTestTopic(topic, 1, 1);
-
- Properties props = new Properties();
- props.putAll(standardProps);
- props.putAll(secureProps);
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment
- .createRemoteEnvironment("localhost", flinkPort);
- env.getConfig().disableSysoutLogging();
-
- env.addSource(new SourceFunction<byte[]>() {
- @Override
- public void run(SourceContext<byte[]> ctx) throws Exception {
- for (int i = 0; i < numElements; i++) {
- ctx.collect(serializedJson[i]);
- }
- }
-
- @Override
- public void cancel() {
- }
- }).addSink(kafkaServer.getProducer(
- topic,
- new ByteArraySerializationSchema(),
- props,
- null));
-
- // Execute blocks
- env.execute();
-
- // Register as table source
- StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);
- tableEnvironment.registerTableSource("kafka", kafkaTableSource);
-
- Table result = tableEnvironment.ingest("kafka");
-
- tableEnvironment.toDataStream(result, Row.class).addSink(new SinkFunction<Row>() {
-
- int i = 0;
-
- @Override
- public void invoke(Row value) throws Exception {
- assertEquals(5, value.productArity());
- assertEquals(longs[i], value.productElement(0));
- assertEquals(strings[i], value.productElement(1));
- assertEquals(booleans[i], value.productElement(2));
- assertEquals(doubles[i], value.productElement(3));
- assertNull(value.productElement(4));
-
- if (i == numElements-1) {
- throw new SuccessException();
- } else {
- i++;
- }
- }
- });
-
- tryExecutePropagateExceptions(env, "KafkaTableSource");
- }
-
- /**
* Serialization scheme forwarding byte[] records.
*/
private static class ByteArraySerializationSchema implements KeyedSerializationSchema<byte[]> {
http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index e46ca08..baddab1 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -38,39 +38,31 @@ import static org.mockito.Mockito.any;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
-public abstract class KafkaTableSinkTestBase implements Serializable {
-
- private final static String TOPIC = "testTopic";
- private final static String[] FIELD_NAMES = new String[] {"field1", "field2"};
- private final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
-
- private final KafkaPartitioner<Row> partitioner = new CustomPartitioner();
- private final Properties properties = createSinkProperties();
+public abstract class KafkaTableSinkTestBase {
+
+ private static final String TOPIC = "testTopic";
+ private static final String[] FIELD_NAMES = new String[] {"field1", "field2"};
+ private static final TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
+ private static final KafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
+ private static final Properties PROPERTIES = createSinkProperties();
+ // we have to mock FlinkKafkaProducerBase as it cannot be instantiated without Kafka
@SuppressWarnings("unchecked")
- private final FlinkKafkaProducerBase<Row> kafkaProducer = mock(FlinkKafkaProducerBase.class);
+ private static final FlinkKafkaProducerBase<Row> PRODUCER = mock(FlinkKafkaProducerBase.class);
@Test
@SuppressWarnings("unchecked")
public void testKafkaTableSink() throws Exception {
DataStream dataStream = mock(DataStream.class);
- KafkaTableSink kafkaTableSink = createTableSink();
- kafkaTableSink.emitDataStream(dataStream);
-
- verify(dataStream).addSink(kafkaProducer);
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testCreatedProducer() throws Exception {
- DataStream dataStream = mock(DataStream.class);
KafkaTableSink kafkaTableSink = spy(createTableSink());
kafkaTableSink.emitDataStream(dataStream);
+ verify(dataStream).addSink(eq(PRODUCER));
+
verify(kafkaTableSink).createKafkaProducer(
eq(TOPIC),
- eq(properties),
+ eq(PROPERTIES),
any(getSerializationSchema()),
- eq(partitioner));
+ eq(PARTITIONER));
}
@Test
@@ -90,12 +82,12 @@ public abstract class KafkaTableSinkTestBase implements Serializable {
protected abstract Class<SerializationSchema<Row>> getSerializationSchema();
private KafkaTableSink createTableSink() {
- return createTableSink(TOPIC, properties, partitioner, kafkaProducer);
+ return createTableSink(TOPIC, PROPERTIES, PARTITIONER, PRODUCER);
}
private static Properties createSinkProperties() {
Properties properties = new Properties();
- properties.setProperty("testKey", "testValue");
+ properties.setProperty("bootstrap.servers", "localhost:12345");
return properties;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5e30ba38/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
new file mode 100644
index 0000000..2a281e8
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import java.util.Properties;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+public abstract class KafkaTableSourceTestBase {
+
+ private static final String TOPIC = "testTopic";
+ private static final String[] FIELD_NAMES = new String[] { "long", "string", "boolean", "double", "missing-field" };
+ private static final TypeInformation<?>[] FIELD_TYPES = new TypeInformation<?>[] {
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.BOOLEAN_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO };
+ private static final Properties PROPERTIES = createSourceProperties();
+
+ @Test
+ public void testKafkaTableSource() {
+ KafkaTableSource kafkaTableSource = spy(createTableSource());
+ StreamExecutionEnvironment env = mock(StreamExecutionEnvironment.class);
+ kafkaTableSource.getDataStream(env);
+
+ verify(env).addSource(any(getFlinkKafkaConsumer()));
+
+ verify(kafkaTableSource).getKafkaConsumer(
+ eq(TOPIC),
+ eq(PROPERTIES),
+ any(getDeserializationSchema()));
+ }
+
+ protected abstract KafkaTableSource createTableSource(String topic, Properties properties,
+ String[] fieldNames, TypeInformation<?>[] typeInfo);
+
+ protected abstract Class<DeserializationSchema<Row>> getDeserializationSchema();
+
+ protected abstract Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer();
+
+ private KafkaTableSource createTableSource() {
+ return createTableSource(TOPIC, PROPERTIES, FIELD_NAMES, FIELD_TYPES);
+ }
+
+ private static Properties createSourceProperties() {
+ Properties properties = new Properties();
+ properties.setProperty("zookeeper.connect", "dummy");
+ properties.setProperty("group.id", "dummy");
+ return properties;
+ }
+}