You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/03/16 04:42:25 UTC
flink git commit: [FLINK-5701] [kafka] FlinkKafkaProducer should
check asyncException on checkpoints
Repository: flink
Updated Branches:
refs/heads/release-1.1 e296acae5 -> 6662cc643
[FLINK-5701] [kafka] FlinkKafkaProducer should check asyncException on checkpoints
This closes #3549.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6662cc64
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6662cc64
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6662cc64
Branch: refs/heads/release-1.1
Commit: 6662cc64332a7c08efd7672d3abea3176529d774
Parents: e296aca
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu Mar 16 00:05:51 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Mar 16 12:41:20 2017 +0800
----------------------------------------------------------------------
.../kafka/FlinkKafkaProducerBase.java | 18 +-
.../kafka/AtLeastOnceProducerTest.java | 422 +++++++++++++------
.../testutils/FakeStandardProducerConfig.java | 34 ++
3 files changed, 339 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6662cc64/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index e63f033..ea9caeb 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -107,7 +107,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
/**
* If true, the producer will wait until all outstanding records have been send to the broker.
*/
- private boolean flushOnCheckpoint;
+ protected boolean flushOnCheckpoint;
// -------------------------------- Runtime fields ------------------------------------------
@@ -330,7 +330,10 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
protected abstract void flush();
@Override
- public Serializable snapshotState(long checkpointId, long checkpointTimestamp) {
+ public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ // check for asynchronous errors and fail the checkpoint if necessary
+ checkErroneous();
+
if (flushOnCheckpoint) {
// flushing is activated: We need to wait until pendingRecords is 0
flush();
@@ -338,7 +341,9 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
if (pendingRecords != 0) {
throw new IllegalStateException("Pending record count must be zero at this point: " + pendingRecords);
}
- // pending records count is 0. We can now confirm the checkpoint
+
+ // if the flushed requests has errors, we should propagate it also and fail the checkpoint
+ checkErroneous();
}
}
// return empty state
@@ -374,4 +379,11 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
return props;
}
+
+ // this is exposed for testing purposes
+ protected long numPendingRecords() {
+ synchronized (pendingRecordsLock) {
+ return pendingRecords;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6662cc64/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
index b02593c..3dabceb 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
@@ -18,32 +18,36 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.Assert;
import org.junit.Test;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* Test ensuring that the producer is not dropping buffered records
@@ -51,168 +55,322 @@ import java.util.concurrent.atomic.AtomicBoolean;
@SuppressWarnings("unchecked")
public class AtLeastOnceProducerTest {
- // we set a timeout because the test will not finish if the logic is broken
- @Test(timeout=5000)
- public void testAtLeastOnceProducer() throws Throwable {
- runTest(true);
- }
+ /**
+ * Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown
+ */
+ @Test
+ public void testAsyncErrorRethrownOnInvoke() throws Throwable {
+ final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
+ FakeStandardProducerConfig.get(), null);
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
+
+ testHarness.open();
- // This test ensures that the actual test fails if the flushing is disabled
- @Test(expected = AssertionError.class, timeout=5000)
- public void ensureTestFails() throws Throwable {
- runTest(false);
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+
+ // let the message request return an async exception
+ producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
+
+ try {
+ testHarness.processElement(new StreamRecord<>("msg-2"));
+ } catch (Exception e) {
+ // the next invoke should rethrow the async exception
+ Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
+
+ // test succeeded
+ return;
+ }
+
+ Assert.fail();
}
- private void runTest(boolean flushOnCheckpoint) throws Throwable {
- Properties props = new Properties();
- final AtomicBoolean snapshottingFinished = new AtomicBoolean(false);
- final TestingKafkaProducer<String> producer = new TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props,
- snapshottingFinished);
- producer.setFlushOnCheckpoint(flushOnCheckpoint);
- producer.setRuntimeContext(new MockRuntimeContext(0, 1));
+ /**
+ * Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown
+ */
+ @Test
+ public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
+ final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
+ FakeStandardProducerConfig.get(), null);
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
- producer.open(new Configuration());
+ testHarness.open();
- for (int i = 0; i < 100; i++) {
- producer.invoke("msg-" + i);
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+
+ // let the message request return an async exception
+ producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
+
+ try {
+ testHarness.snapshot(123L, 123L);
+ } catch (Exception e) {
+ // the next invoke should rethrow the async exception
+ Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
+
+ // test succeeded
+ return;
}
- // start a thread confirming all pending records
- final Tuple1<Throwable> runnableError = new Tuple1<>(null);
- final Thread threadA = Thread.currentThread();
- Runnable confirmer = new Runnable() {
+ Assert.fail();
+ }
+
+ /**
+ * Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
+ * it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
+ *
+ * Note that this test does not test the snapshot method is blocked correctly when there are pending recorrds.
+ * The test for that is covered in testAtLeastOnceProducer.
+ */
+ @SuppressWarnings("unchecked")
+ @Test(timeout=5000)
+ public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
+ final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
+ FakeStandardProducerConfig.get(), null);
+ producer.setFlushOnCheckpoint(true);
+
+ final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
+
+ final OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+ testHarness.processElement(new StreamRecord<>("msg-2"));
+ testHarness.processElement(new StreamRecord<>("msg-3"));
+
+ verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
+
+ // only let the first callback succeed for now
+ producer.getPendingCallbacks().get(0).onCompletion(null, null);
+
+ final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
+ Thread snapshotThread = new Thread(new Runnable() {
@Override
public void run() {
try {
- MockProducer mp = producer.getProducerInstance();
- List<Callback> pending = mp.getPending();
-
- // we need to find out if the snapshot() method blocks forever
- // this is not possible. If snapshot() is running, it will
- // start removing elements from the pending list.
- synchronized (threadA) {
- threadA.wait(500L);
- }
- // we now check that no records have been confirmed yet
- Assert.assertEquals(100, pending.size());
- Assert.assertFalse("Snapshot method returned before all records were confirmed",
- snapshottingFinished.get());
-
- // now confirm all checkpoints
- for (Callback c: pending) {
- c.onCompletion(null, null);
- }
- pending.clear();
- } catch(Throwable t) {
- runnableError.f0 = t;
+ testHarness.snapshot(123L, 123L);
+ } catch (Exception e) {
+ exceptionRef.compareAndSet(null, e);
}
}
- };
- Thread threadB = new Thread(confirmer);
- threadB.start();
- // this should block:
- producer.snapshotState(0, 0);
- synchronized (threadA) {
- threadA.notifyAll(); // just in case, to let the test fail faster
- }
- Assert.assertEquals(0, producer.getProducerInstance().getPending().size());
- Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
- while (deadline.hasTimeLeft() && threadB.isAlive()) {
- threadB.join(500);
- }
- Assert.assertFalse("Thread A is expected to be finished at this point. If not, the test is prone to fail", threadB.isAlive());
- if (runnableError.f0 != null) {
- throw runnableError.f0;
- }
+ });
+ snapshotThread.start();
+
+ // let the 2nd message fail with an async exception
+ producer.getPendingCallbacks().get(1).onCompletion(null, new Exception("artificial async failure for 2nd message"));
+ producer.getPendingCallbacks().get(2).onCompletion(null, null);
+
+ snapshotThread.join();
- producer.close();
+ // the snapshot should have failed with the async exception
+ Exception snapshotError = exceptionRef.get();
+ assertTrue(snapshotError != null);
+ assertTrue(snapshotError.getCause().getMessage().contains("artificial async failure for 2nd message"));
}
+ /**
+ * Test ensuring that the producer is not dropping buffered records;
+ * we set a timeout because the test will not finish if the logic is broken
+ */
+ @SuppressWarnings("unchecked")
+ @Test(timeout=10000)
+ public void testAtLeastOnceProducer() throws Throwable {
+ final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
+ FakeStandardProducerConfig.get(), null);
- private static class TestingKafkaProducer<T> extends FlinkKafkaProducerBase<T> {
- private MockProducer prod;
- private AtomicBoolean snapshottingFinished;
+ // enable flushing
+ producer.setFlushOnCheckpoint(true);
- public TestingKafkaProducer(String defaultTopicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, AtomicBoolean snapshottingFinished) {
- super(defaultTopicId, serializationSchema, producerConfig, null);
- this.snapshottingFinished = snapshottingFinished;
- }
+ final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
- @Override
- protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) {
- this.prod = new MockProducer();
- return this.prod;
- }
+ final OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
- @Override
- public Serializable snapshotState(long checkpointId, long checkpointTimestamp) {
- // call the actual snapshot state
- Serializable ret = super.snapshotState(checkpointId, checkpointTimestamp);
- // notify test that snapshotting has been done
- snapshottingFinished.set(true);
- return ret;
- }
+ testHarness.open();
- @Override
- protected void flush() {
- this.prod.flush();
- }
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+ testHarness.processElement(new StreamRecord<>("msg-2"));
+ testHarness.processElement(new StreamRecord<>("msg-3"));
- public MockProducer getProducerInstance() {
- return this.prod;
- }
+ verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
+ Assert.assertEquals(3, producer.getPendingSize());
+
+ // start a thread to perform checkpointing
+ final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
+ Thread snapshotThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // this should block until all records are flushed;
+ // if the snapshot implementation returns before pending records are flushed,
+ testHarness.snapshot(123L, 123L);
+ } catch (Exception e) {
+ exceptionRef.compareAndSet(null, e);
+ }
+ }
+ });
+ snapshotThread.start();
+
+ // before proceeding, make sure that flushing has started and that the snapshot is still blocked;
+ // this would block forever if the snapshot didn't perform a flush
+ producer.waitUntilFlushStarted();
+ Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());
+
+ // now, complete the callbacks
+ producer.getPendingCallbacks().get(0).onCompletion(null, null);
+ Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());
+ Assert.assertEquals(2, producer.getPendingSize());
+
+ producer.getPendingCallbacks().get(1).onCompletion(null, null);
+ Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());
+ Assert.assertEquals(1, producer.getPendingSize());
+
+ producer.getPendingCallbacks().get(2).onCompletion(null, null);
+ Assert.assertEquals(0, producer.getPendingSize());
+
+ snapshotThread.join();
+
+ // snapshot would fail with an exception if flushing wasn't completed before the snapshot method returned;
+ // make sure this did not happen
+ assertTrue(exceptionRef.get() == null);
+
+ testHarness.close();
}
- private static class MockProducer<K, V> extends KafkaProducer<K, V> {
- List<Callback> pendingCallbacks = new ArrayList<>();
+ /**
+ * This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled,
+ * the snapshot method does indeed finishes without waiting for pending records;
+ * we set a timeout because the test will not finish if the logic is broken
+ */
+ @SuppressWarnings("unchecked")
+ @Test(timeout=5000)
+ public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
+ final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
+ FakeStandardProducerConfig.get(), null);
+ producer.setFlushOnCheckpoint(false);
+
+ final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
+
+ final OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>("msg"));
+
+ // make sure that all callbacks have not been completed
+ verify(mockProducer, times(1)).send(any(ProducerRecord.class), any(Callback.class));
+
+ // should return even if there are pending records
+ testHarness.snapshot(123L, 123L);
+
+ testHarness.close();
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static class DummyFlinkKafkaProducer<T> extends FlinkKafkaProducerBase<T> {
+ private static final long serialVersionUID = 1L;
+
+ private final static String DUMMY_TOPIC = "dummy-topic";
+
+ private transient KafkaProducer<?, ?> mockProducer;
+ private transient List<Callback> pendingCallbacks;
+ private transient MultiShotLatch flushLatch;
+ private boolean isFlushed;
- private static Properties getFakeProperties() {
- Properties p = new Properties();
- p.setProperty("bootstrap.servers", "localhost:12345");
- p.setProperty("key.serializer", ByteArraySerializer.class.getName());
- p.setProperty("value.serializer", ByteArraySerializer.class.getName());
- return p;
+ @SuppressWarnings("unchecked")
+ DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner) {
+
+ super(DUMMY_TOPIC, (KeyedSerializationSchema<T>) mock(KeyedSerializationSchema.class), producerConfig, partitioner);
+
+ this.pendingCallbacks = new ArrayList<>();
+ this.flushLatch = new MultiShotLatch();
}
- public MockProducer() {
- super(getFakeProperties());
+
+ long getPendingSize() {
+ if (flushOnCheckpoint) {
+ return numPendingRecords();
+ } else {
+ // when flushing is disabled, the implementation does not
+ // maintain the current number of pending records to reduce
+ // the extra locking overhead required to do so
+ throw new UnsupportedOperationException("getPendingSize not supported when flushing is disabled");
+ }
}
- @Override
- public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
- throw new UnsupportedOperationException("Unexpected");
+ List<Callback> getPendingCallbacks() {
+ return pendingCallbacks;
}
- @Override
- public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
- pendingCallbacks.add(callback);
- return null;
+ KafkaProducer<?, ?> getMockKafkaProducer() {
+ return mockProducer;
}
@Override
- public List<PartitionInfo> partitionsFor(String topic) {
- List<PartitionInfo> list = new ArrayList<>();
- list.add(new PartitionInfo(topic, 0, null, null, null));
- return list;
+ public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ isFlushed = false;
+
+ Serializable snapshot = super.snapshotState(checkpointId, checkpointTimestamp);
+
+ // if the snapshot implementation doesn't wait until all pending records are flushed, we should fail the test
+ if (flushOnCheckpoint && !isFlushed) {
+ throw new RuntimeException("Flushing is enabled; snapshots should be blocked until all pending records are flushed");
+ }
+
+ return snapshot;
}
- @Override
- public Map<MetricName, ? extends Metric> metrics() {
- return null;
+ public void waitUntilFlushStarted() throws Exception {
+ flushLatch.await();
}
+ @SuppressWarnings("unchecked")
+ @Override
+ protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) {
+ if (this.mockProducer == null) {
+ this.mockProducer = mock(KafkaProducer.class);
+ when(mockProducer.send(any(ProducerRecord.class), any(Callback.class))).thenAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ pendingCallbacks.add((Callback) invocationOnMock.getArguments()[1]);
+ return null;
+ }
+ });
+ }
- public List<Callback> getPending() {
- return this.pendingCallbacks;
+ return (KafkaProducer<K, V>) this.mockProducer;
}
- public void flush() {
- while (pendingCallbacks.size() > 0) {
+ @Override
+ protected void flush() {
+ flushLatch.trigger();
+
+ // simply wait until the producer's pending records become zero.
+ // This relies on the fact that the producer's Callback implementation
+ // and pending records tracking logic is implemented correctly, otherwise
+ // we will loop forever.
+ while (numPendingRecords() > 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException("Unable to flush producer, task was interrupted");
}
}
+
+ isFlushed = true;
+ }
+
+ @Override
+ public RuntimeContext getRuntimeContext() {
+ StreamingRuntimeContext runtimeContext = mock(StreamingRuntimeContext.class);
+ when(runtimeContext.isCheckpointingEnabled()).thenReturn(true);
+ when(runtimeContext.getMetricGroup()).thenReturn(mock(MetricGroup.class));
+ return runtimeContext;
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6662cc64/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
new file mode 100644
index 0000000..055326d
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FakeStandardProducerConfig.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+
+import java.util.Properties;
+
+public class FakeStandardProducerConfig {
+
+ public static Properties get() {
+ Properties p = new Properties();
+ p.setProperty("bootstrap.servers", "localhost:12345");
+ p.setProperty("key.serializer", ByteArraySerializer.class.getName());
+ p.setProperty("value.serializer", ByteArraySerializer.class.getName());
+ return p;
+ }
+
+}