You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/06/03 16:13:57 UTC

[kafka] branch trunk updated: KAFKA-8449: Restart tasks on reconfiguration under incremental cooperative rebalancing (#6850)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3c7c988  KAFKA-8449: Restart tasks on reconfiguration under incremental cooperative rebalancing (#6850)
3c7c988 is described below

commit 3c7c988e3903d87ae1072fa9be5f349e5912d98e
Author: Konstantine Karantasis <ko...@confluent.io>
AuthorDate: Mon Jun 3 09:13:40 2019 -0700

    KAFKA-8449: Restart tasks on reconfiguration under incremental cooperative rebalancing (#6850)
    
    Restart task on reconfiguration under incremental cooperative rebalancing, and keep execution paths separate for config updates between eager and cooperative. Include the group generation in the log message when the worker receives its assignment.
    
    Author: Konstantine Karantasis <ko...@confluent.io>
    Reviewer: Randall Hauch <rh...@gmail.com>
---
 .../runtime/distributed/DistributedHerder.java     | 203 ++++++++++++++++-----
 .../integration/MonitorableSourceConnector.java    |   5 +-
 .../RebalanceSourceConnectorsIntegrationTest.java  |  63 ++++++-
 3 files changed, 220 insertions(+), 51 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 585836e..52709f7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -80,6 +80,8 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
 
@@ -141,6 +143,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     // and the from other nodes are safe to process
     private boolean rebalanceResolved;
     private ExtendedAssignment runningAssignment = ExtendedAssignment.empty();
+    private Set<ConnectorTaskId> tasksToRestart = new HashSet<>();
     private ExtendedAssignment assignment;
     private boolean canReadConfigs;
     private ClusterConfigState configState;
@@ -151,6 +154,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     // Config updates can be collected and applied together when possible. Also, we need to take care to rebalance when
     // needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits).
     private Set<String> connectorConfigUpdates = new HashSet<>();
+    private Set<ConnectorTaskId> taskConfigUpdates = new HashSet<>();
     // Similarly collect target state changes (when observed by the config storage listener) for handling in the
     // herder's main thread.
     private Set<String> connectorTargetStateChanges = new HashSet<>();
@@ -304,51 +308,47 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         }
 
         // Process any configuration updates
-        Set<String> connectorConfigUpdatesCopy = null;
-        Set<String> connectorTargetStateChangesCopy = null;
-        synchronized (this) {
-            if (needsReconfigRebalance || !connectorConfigUpdates.isEmpty() || !connectorTargetStateChanges.isEmpty()) {
-                // Connector reconfigs only need local updates since there is no coordination between workers required.
-                // However, if connectors were added or removed, work needs to be rebalanced since we have more work
-                // items to distribute among workers.
-                configState = configBackingStore.snapshot();
-
-                if (needsReconfigRebalance) {
-                    // Task reconfigs require a rebalance. Request the rebalance, clean out state, and then restart
-                    // this loop, which will then ensure the rebalance occurs without any other requests being
-                    // processed until it completes.
-                    member.requestRejoin();
-                    // Any connector config updates or target state changes will be addressed during the rebalance too
-                    connectorConfigUpdates.clear();
-                    connectorTargetStateChanges.clear();
-                    needsReconfigRebalance = false;
-                    log.debug("Requesting rebalance due to reconfiguration of tasks (needsReconfigRebalance: {})",
-                            needsReconfigRebalance);
-                    return;
-                } else {
-                    if (!connectorConfigUpdates.isEmpty()) {
-                        // We can't start/stop while locked since starting connectors can cause task updates that will
-                        // require writing configs, which in turn make callbacks into this class from another thread that
-                        // require acquiring a lock. This leads to deadlock. Instead, just copy the info we need and process
-                        // the updates after unlocking.
-                        connectorConfigUpdatesCopy = connectorConfigUpdates;
-                        connectorConfigUpdates = new HashSet<>();
-                    }
+        AtomicReference<Set<String>> connectorConfigUpdatesCopy = new AtomicReference<>();
+        AtomicReference<Set<String>> connectorTargetStateChangesCopy = new AtomicReference<>();
+        AtomicReference<Set<ConnectorTaskId>> taskConfigUpdatesCopy = new AtomicReference<>();
+
+        boolean shouldReturn;
+        if (member.currentProtocolVersion() == CONNECT_PROTOCOL_V0) {
+            shouldReturn = updateConfigsWithEager(connectorConfigUpdatesCopy,
+                    connectorTargetStateChangesCopy);
+            // With eager protocol we should return immediately if needsReconfigRebalance has
+            // been set to retain the old workflow
+            if (shouldReturn) {
+                return;
+            }
 
-                    if (!connectorTargetStateChanges.isEmpty()) {
-                        // Similarly for target state changes which can cause connectors to be restarted
-                        connectorTargetStateChangesCopy = connectorTargetStateChanges;
-                        connectorTargetStateChanges = new HashSet<>();
-                    }
-                }
+            if (connectorConfigUpdatesCopy.get() != null) {
+                processConnectorConfigUpdates(connectorConfigUpdatesCopy.get());
             }
-        }
 
-        if (connectorConfigUpdatesCopy != null)
-            processConnectorConfigUpdates(connectorConfigUpdatesCopy);
+            if (connectorTargetStateChangesCopy.get() != null) {
+                processTargetStateChanges(connectorTargetStateChangesCopy.get());
+            }
+        } else {
+            shouldReturn = updateConfigsWithIncrementalCooperative(connectorConfigUpdatesCopy,
+                    connectorTargetStateChangesCopy, taskConfigUpdatesCopy);
+
+            if (connectorConfigUpdatesCopy.get() != null) {
+                processConnectorConfigUpdates(connectorConfigUpdatesCopy.get());
+            }
+
+            if (connectorTargetStateChangesCopy.get() != null) {
+                processTargetStateChanges(connectorTargetStateChangesCopy.get());
+            }
 
-        if (connectorTargetStateChangesCopy != null)
-            processTargetStateChanges(connectorTargetStateChangesCopy);
+            if (taskConfigUpdatesCopy.get() != null) {
+                processTaskConfigUpdatesWithIncrementalCooperative(taskConfigUpdatesCopy.get());
+            }
+
+            if (shouldReturn) {
+                return;
+            }
+        }
 
         // Let the group take any actions it needs to
         try {
@@ -360,6 +360,95 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         }
     }
 
+    private synchronized boolean updateConfigsWithEager(AtomicReference<Set<String>> connectorConfigUpdatesCopy,
+                                                        AtomicReference<Set<String>> connectorTargetStateChangesCopy) {
+        // This branch is here to avoid creating a snapshot if not needed
+        if (needsReconfigRebalance
+                || !connectorConfigUpdates.isEmpty()
+                || !connectorTargetStateChanges.isEmpty()) {
+            // Connector reconfigs only need local updates since there is no coordination between workers required.
+            // However, if connectors were added or removed, work needs to be rebalanced since we have more work
+            // items to distribute among workers.
+            configState = configBackingStore.snapshot();
+
+            if (needsReconfigRebalance) {
+                // Task reconfigs require a rebalance. Request the rebalance, clean out state, and then restart
+                // this loop, which will then ensure the rebalance occurs without any other requests being
+                // processed until it completes.
+                log.debug("Requesting rebalance due to reconfiguration of tasks (needsReconfigRebalance: {})",
+                        needsReconfigRebalance);
+                member.requestRejoin();
+                needsReconfigRebalance = false;
+                // Any connector config updates or target state changes will be addressed during the rebalance too
+                connectorConfigUpdates.clear();
+                connectorTargetStateChanges.clear();
+                return true;
+            } else {
+                if (!connectorConfigUpdates.isEmpty()) {
+                    // We can't start/stop while locked since starting connectors can cause task updates that will
+                    // require writing configs, which in turn make callbacks into this class from another thread that
+                    // require acquiring a lock. This leads to deadlock. Instead, just copy the info we need and process
+                    // the updates after unlocking.
+                    connectorConfigUpdatesCopy.set(connectorConfigUpdates);
+                    connectorConfigUpdates = new HashSet<>();
+                }
+
+                if (!connectorTargetStateChanges.isEmpty()) {
+                    // Similarly for target state changes which can cause connectors to be restarted
+                    connectorTargetStateChangesCopy.set(connectorTargetStateChanges);
+                    connectorTargetStateChanges = new HashSet<>();
+                }
+            }
+        }
+        return false;
+    }
+
+    private synchronized boolean updateConfigsWithIncrementalCooperative(AtomicReference<Set<String>> connectorConfigUpdatesCopy,
+                                                                         AtomicReference<Set<String>> connectorTargetStateChangesCopy,
+                                                                         AtomicReference<Set<ConnectorTaskId>> taskConfigUpdatesCopy) {
+        boolean retValue = false;
+        // This branch is here to avoid creating a snapshot if not needed
+        if (needsReconfigRebalance
+                || !connectorConfigUpdates.isEmpty()
+                || !connectorTargetStateChanges.isEmpty()
+                || !taskConfigUpdates.isEmpty()) {
+            // Connector reconfigs only need local updates since there is no coordination between workers required.
+            // However, if connectors were added or removed, work needs to be rebalanced since we have more work
+            // items to distribute among workers.
+            configState = configBackingStore.snapshot();
+
+            if (needsReconfigRebalance) {
+                log.debug("Requesting rebalance due to reconfiguration of tasks (needsReconfigRebalance: {})",
+                        needsReconfigRebalance);
+                member.requestRejoin();
+                needsReconfigRebalance = false;
+                retValue = true;
+            }
+
+            if (!connectorConfigUpdates.isEmpty()) {
+                // We can't start/stop while locked since starting connectors can cause task updates that will
+                // require writing configs, which in turn make callbacks into this class from another thread that
+                // require acquiring a lock. This leads to deadlock. Instead, just copy the info we need and process
+                // the updates after unlocking.
+                connectorConfigUpdatesCopy.set(connectorConfigUpdates);
+                connectorConfigUpdates = new HashSet<>();
+            }
+
+            if (!connectorTargetStateChanges.isEmpty()) {
+                // Similarly for target state changes which can cause connectors to be restarted
+                connectorTargetStateChangesCopy.set(connectorTargetStateChanges);
+                connectorTargetStateChanges = new HashSet<>();
+            }
+
+            if (!taskConfigUpdates.isEmpty()) {
+                // Similarly for task config updates
+                taskConfigUpdatesCopy.set(taskConfigUpdates);
+                taskConfigUpdates = new HashSet<>();
+            }
+        }
+        return retValue;
+    }
+
     private void processConnectorConfigUpdates(Set<String> connectorConfigUpdates) {
         // If we only have connector config updates, we can just bounce the updated connectors that are
         // currently assigned to this worker.
@@ -396,6 +485,21 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         }
     }
 
+    private void processTaskConfigUpdatesWithIncrementalCooperative(Set<ConnectorTaskId> taskConfigUpdates) {
+        Set<ConnectorTaskId> localTasks = assignment == null
+                                          ? Collections.emptySet()
+                                          : new HashSet<>(assignment.tasks());
+        Set<String> connectorsWhoseTasksToStop = taskConfigUpdates.stream()
+                .map(ConnectorTaskId::connector).collect(Collectors.toSet());
+
+        List<ConnectorTaskId> tasksToStop = localTasks.stream()
+                .filter(taskId -> connectorsWhoseTasksToStop.contains(taskId.connector()))
+                .collect(Collectors.toList());
+        log.info("Handling task config update by restarting tasks {}", tasksToStop);
+        worker.stopAndAwaitTasks(tasksToStop);
+        tasksToRestart.addAll(tasksToStop);
+    }
+
     // public for testing
     public void halt() {
         synchronized (this) {
@@ -900,6 +1004,12 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
             callables.add(getConnectorStartingCallable(connectorName));
         }
 
+        // These tasks have been stopped by this worker due to task reconfiguration. In order to
+        // restart them, they are removed just before the overall task startup from the set of
+        // currently running tasks. Therefore, they'll be restarted only if they are included in
+        // the assignment that was just received after rebalancing.
+        runningAssignment.tasks().removeAll(tasksToRestart);
+        tasksToRestart.clear();
         for (ConnectorTaskId taskId : assignmentDifference(assignment.tasks(), runningAssignment.tasks())) {
             callables.add(getTaskStartingCallable(taskId));
         }
@@ -1172,12 +1282,17 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         public void onTaskConfigUpdate(Collection<ConnectorTaskId> tasks) {
             log.info("Tasks {} configs updated", tasks);
 
-            // Stage the update and wake up the work thread. No need to record the set of tasks here because task reconfigs
-            // always need a rebalance to ensure offsets get committed.
+            // Stage the update and wake up the work thread.
+            // The set of tasks is recorder for incremental cooperative rebalancing, in which
+            // tasks don't get restarted unless they are balanced between workers.
+            // With eager rebalancing there's no need to record the set of tasks because task reconfigs
+            // always need a rebalance to ensure offsets get committed. In eager rebalancing the
+            // recorded set of tasks remains unused.
             // TODO: As an optimization, some task config updates could avoid a rebalance. In particular, single-task
             // connectors clearly don't need any coordination.
             synchronized (DistributedHerder.this) {
                 needsReconfigRebalance = true;
+                taskConfigUpdates.addAll(tasks);
             }
             member.wakeup();
         }
@@ -1279,7 +1394,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
             // catch up (or backoff if we fail) not executed in a callback, and so we'll be able to invoke other
             // group membership actions (e.g., we may need to explicitly leave the group if we cannot handle the
             // assigned tasks).
-            log.info("Joined group and got assignment: {}", assignment);
+            log.info("Joined group at generation {} and got assignment: {}", generation, assignment);
             synchronized (DistributedHerder.this) {
                 DistributedHerder.this.assignment = assignment;
                 DistributedHerder.this.generation = generation;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
index 8bc8953..2ca7698 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
@@ -44,6 +44,7 @@ import java.util.stream.LongStream;
 public class MonitorableSourceConnector extends TestSourceConnector {
     private static final Logger log = LoggerFactory.getLogger(MonitorableSourceConnector.class);
 
+    public static final String TOPIC_CONFIG = "topic";
     private String connectorName;
     private ConnectorHandle connectorHandle;
     private Map<String, String> commonConfigs;
@@ -105,7 +106,7 @@ public class MonitorableSourceConnector extends TestSourceConnector {
         public void start(Map<String, String> props) {
             taskId = props.get("task.id");
             connectorName = props.get("connector.name");
-            topicName = props.getOrDefault("topic", "sequential-topic");
+            topicName = props.getOrDefault(TOPIC_CONFIG, "sequential-topic");
             throughput = Long.valueOf(props.getOrDefault("throughput", "-1"));
             batchSize = Integer.valueOf(props.getOrDefault("messages.per.poll", "1"));
             taskHandle = RuntimeHandles.get().connectorHandle(connectorName).taskHandle(taskId);
@@ -113,7 +114,7 @@ public class MonitorableSourceConnector extends TestSourceConnector {
                     context.offsetStorageReader().offset(Collections.singletonMap("task.id", taskId)))
                     .orElse(Collections.emptyMap());
             startingSeqno = Optional.ofNullable((Long) offset.get("saved")).orElse(0L);
-            log.info("Started {} task {}", this.getClass().getSimpleName(), taskId);
+            log.info("Started {} task {} with properties {}", this.getClass().getSimpleName(), taskId, props);
             throttler = new ThroughputThrottler(throughput, System.currentTimeMillis());
         }
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
index b0125b2..d3cc8db 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
@@ -39,17 +39,18 @@ import java.util.Properties;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
-import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
 import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
 import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.COMPATIBLE;
 import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONNECT_PROTOCOL_CONFIG;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Integration tests for incremental cooperative rebalancing between Connect workers
@@ -109,7 +110,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
         props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
         props.put("throughput", String.valueOf(1));
         props.put("messages.per.poll", String.valueOf(10));
-        props.put(TOPICS_CONFIG, TOPIC_NAME);
+        props.put(TOPIC_CONFIG, TOPIC_NAME);
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
 
@@ -130,6 +131,58 @@ public class RebalanceSourceConnectorsIntegrationTest {
     }
 
     @Test
+    public void testReconfigConnector() throws Exception {
+        ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
+
+        // create test topic
+        String anotherTopic = "another-topic";
+        connect.kafka().createTopic(TOPIC_NAME, NUM_TOPIC_PARTITIONS);
+        connect.kafka().createTopic(anotherTopic, NUM_TOPIC_PARTITIONS);
+
+        // setup up props for the source connector
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
+        props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+        props.put("throughput", String.valueOf(1));
+        props.put("messages.per.poll", String.valueOf(10));
+        props.put(TOPIC_CONFIG, TOPIC_NAME);
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+
+        // start a source connector
+        connect.configureConnector(CONNECTOR_NAME, props);
+
+        waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME, NUM_TASKS).orElse(false),
+                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+
+        int numRecordsProduced = 100;
+        int recordTransferDurationMs = 5000;
+
+        // consume all records from the source topic or fail, to ensure that they were correctly produced
+        int recordNum = connect.kafka().consume(numRecordsProduced, recordTransferDurationMs, TOPIC_NAME).count();
+        assertTrue("Not enough records produced by source connector. Expected at least: " + numRecordsProduced + " + but got " + recordNum,
+                recordNum >= numRecordsProduced);
+
+        // Reconfigure the source connector by changing the Kafka topic used as output
+        props.put(TOPIC_CONFIG, anotherTopic);
+        connect.configureConnector(CONNECTOR_NAME, props);
+
+        waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME, NUM_TASKS).orElse(false),
+                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+
+        // expect all records to be produced by the connector
+        connectorHandle.expectedRecords(numRecordsProduced);
+
+        // expect all records to be produced by the connector
+        connectorHandle.expectedCommits(numRecordsProduced);
+
+        // consume all records from the source topic or fail, to ensure that they were correctly produced
+        recordNum = connect.kafka().consume(numRecordsProduced, recordTransferDurationMs, anotherTopic).count();
+        assertTrue("Not enough records produced by source connector. Expected at least: " + numRecordsProduced + " + but got " + recordNum,
+                recordNum >= numRecordsProduced);
+    }
+
+    @Test
     public void testDeleteConnector() throws Exception {
         // create test topic
         connect.kafka().createTopic(TOPIC_NAME, NUM_TOPIC_PARTITIONS);
@@ -140,7 +193,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
         props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
         props.put("throughput", String.valueOf(1));
         props.put("messages.per.poll", String.valueOf(10));
-        props.put(TOPICS_CONFIG, TOPIC_NAME);
+        props.put(TOPIC_CONFIG, TOPIC_NAME);
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
 
@@ -181,7 +234,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
         props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
         props.put("throughput", String.valueOf(1));
         props.put("messages.per.poll", String.valueOf(10));
-        props.put(TOPICS_CONFIG, TOPIC_NAME);
+        props.put(TOPIC_CONFIG, TOPIC_NAME);
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
 
@@ -224,7 +277,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
         props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
         props.put("throughput", String.valueOf(1));
         props.put("messages.per.poll", String.valueOf(10));
-        props.put(TOPICS_CONFIG, TOPIC_NAME);
+        props.put(TOPIC_CONFIG, TOPIC_NAME);
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());