You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2017/11/06 02:00:56 UTC

kafka git commit: KAFKA-6157; Fix repeated words words in JavaDoc and comments.

Repository: kafka
Updated Branches:
  refs/heads/trunk 520b31362 -> 86062e9a7


KAFKA-6157; Fix repeated words words in JavaDoc and comments.

Author: Adem Efe Gencer <ag...@linkedin.com>

Reviewers: Jiangjie Qin <be...@gmail.com>

Closes #4170 from efeg/bug/typoFix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/86062e9a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/86062e9a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/86062e9a

Branch: refs/heads/trunk
Commit: 86062e9a78dccad74e012f11755025512ad5cf63
Parents: 520b313
Author: Adem Efe Gencer <ag...@linkedin.com>
Authored: Sun Nov 5 18:00:43 2017 -0800
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Sun Nov 5 18:00:43 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/kafka/clients/InFlightRequests.java |  4 ++--
 .../java/org/apache/kafka/clients/NetworkClient.java    |  2 +-
 .../apache/kafka/clients/consumer/KafkaConsumer.java    |  8 ++++----
 .../apache/kafka/clients/producer/KafkaProducer.java    |  2 +-
 .../kafka/common/record/MemoryRecordsBuilder.java       |  2 +-
 .../security/authenticator/SaslClientAuthenticator.java |  2 +-
 .../connect/runtime/distributed/DistributedHerder.java  |  2 +-
 .../org/apache/kafka/connect/util/KafkaBasedLog.java    |  2 +-
 .../connect/storage/KafkaConfigBackingStoreTest.java    |  4 ++--
 core/src/main/scala/kafka/cluster/Partition.scala       |  2 +-
 .../scala/kafka/coordinator/group/GroupMetadata.scala   |  3 ++-
 .../transaction/TransactionStateManager.scala           |  2 +-
 .../main/scala/kafka/server/DelayedDeleteRecords.scala  |  2 +-
 core/src/main/scala/kafka/tools/JmxTool.scala           |  2 +-
 .../integration/kafka/api/ProducerBounceTest.scala      |  2 +-
 .../unit/kafka/admin/DeleteConsumerGroupTest.scala      |  2 +-
 core/src/test/scala/unit/kafka/log/LogManagerTest.scala |  2 +-
 .../scala/unit/kafka/network/SocketServerTest.scala     |  2 +-
 core/src/test/scala/unit/kafka/utils/TestUtils.scala    |  2 +-
 docs/connect.html                                       |  2 +-
 docs/documentation/streams/architecture.html            |  2 +-
 docs/documentation/streams/core-concepts.html           |  2 +-
 docs/documentation/streams/developer-guide.html         |  2 +-
 docs/documentation/streams/index.html                   |  2 +-
 docs/documentation/streams/quickstart.html              |  2 +-
 docs/documentation/streams/tutorial.html                |  2 +-
 docs/documentation/streams/upgrade-guide.html           |  2 +-
 docs/implementation.html                                |  2 +-
 .../java/org/apache/kafka/streams/StreamsBuilder.java   | 10 +++++-----
 .../apache/kafka/streams/kstream/KStreamBuilder.java    | 12 ++++++------
 .../streams/kstream/internals/KStreamAggregate.java     |  2 +-
 .../kafka/streams/kstream/internals/KStreamReduce.java  |  2 +-
 .../kstream/internals/KStreamWindowAggregate.java       |  2 +-
 .../streams/kstream/internals/KStreamWindowReduce.java  |  2 +-
 .../streams/kstream/internals/KTableAggregate.java      |  2 +-
 .../kafka/streams/kstream/internals/KTableReduce.java   |  2 +-
 .../kafka/streams/processor/internals/StreamTask.java   |  2 +-
 tests/setup.cfg                                         |  2 +-
 tests/unit/setup.cfg                                    |  2 +-
 .../org/apache/kafka/tools/ProducerPerformance.java     |  2 +-
 40 files changed, 55 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
index f977329..3689a09 100644
--- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
+++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
@@ -60,7 +60,7 @@ final class InFlightRequests {
     }
 
     /**
-     * Get the oldest request (the one that that will be completed next) for the given node
+     * Get the oldest request (the one that will be completed next) for the given node
      */
     public NetworkClient.InFlightRequest completeNext(String node) {
         return requestQueue(node).pollLast();
@@ -167,5 +167,5 @@ final class InFlightRequests {
         }
         return nodeIds;
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index ee7258a..0654a91 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -369,7 +369,7 @@ public class NetworkClient implements KafkaClient {
         if (!isInternalRequest) {
             // If this request came from outside the NetworkClient, validate
             // that we can send data.  If the request is internal, we trust
-            // that that internal code has done this validation.  Validation
+            // that internal code has done this validation.  Validation
             // will be slightly different for some internal requests (for
             // example, ApiVersionsRequests can be sent prior to being in
             // READY state.)

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 05bca22..e9499cb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -423,7 +423,7 @@ import java.util.regex.Pattern;
  * <p>
  * Transactions were introduced in Kafka 0.11.0 wherein applications can write to multiple topics and partitions atomically.
  * In order for this to work, consumers reading from these partitions should be configured to only read committed data.
- * This can be achieved by by setting the {@code isolation.level=read_committed} in the consumer's configuration.
+ * This can be achieved by setting the {@code isolation.level=read_committed} in the consumer's configuration.
  *
  * <p>
  * In <code>read_committed</code> mode, the consumer will read only those transactional messages which have been
@@ -704,9 +704,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             IsolationLevel isolationLevel = IsolationLevel.valueOf(
                     config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
             Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry.fetcherMetrics);
-            
-            int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); 
-            
+
+            int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
+
             NetworkClient netClient = new NetworkClient(
                     new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext),
                     this.metadata,

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 8004180..b3cff19 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -951,7 +951,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * </p>
      * <p>
      * Applications don't need to call this method for transactional producers, since the {@link #commitTransaction()} will
-     * flush all buffered records before performing the commit. This ensures that all the the {@link #send(ProducerRecord)}
+     * flush all buffered records before performing the commit. This ensures that all the {@link #send(ProducerRecord)}
      * calls made since the previous {@link #beginTransaction()} are completed before the commit.
      * </p>
      *

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index ad0bab7..a9b57ac 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -384,7 +384,7 @@ public class MemoryRecordsBuilder {
     }
 
     /**
-     * Append a record and return its checksum for message format v0 and v1, or null for for v2 and above.
+     * Append a record and return its checksum for message format v0 and v1, or null for v2 and above.
      */
     private Long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
                                   ByteBuffer value, Header[] headers) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index b01ae4c..8b01165 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -186,7 +186,7 @@ public class SaslClientAuthenticator implements Authenticator {
                     if (authenticateVersion != null)
                         saslAuthenticateVersion((short) Math.min(authenticateVersion.maxVersion, ApiKeys.SASL_AUTHENTICATE.latestVersion()));
                     setSaslState(SaslState.SEND_HANDSHAKE_REQUEST);
-                    // Fall through to send send handshake request with the latest supported version
+                    // Fall through to send handshake request with the latest supported version
                 }
             case SEND_HANDSHAKE_REQUEST:
                 SaslHandshakeRequest handshakeRequest = createSaslHandshakeRequest(saslHandshakeVersion);

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 4d3d07b..79d32da 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1213,7 +1213,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         public void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
             log.info("Rebalance started");
 
-            // Note that since we don't reset the assignment, we we don't revoke leadership here. During a rebalance,
+            // Note that since we don't reset the assignment, we don't revoke leadership here. During a rebalance,
             // it is still important to have a leader that can write configs, offsets, etc.
 
             if (rebalanceResolved) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 0e190bc..de1ceb3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -316,7 +316,7 @@ public class KafkaBasedLog<K, V> {
 
                     synchronized (KafkaBasedLog.this) {
                         // Only invoke exactly the number of callbacks we found before triggering the read to log end
-                        // since it is possible for another write + readToEnd to sneak in in the meantime
+                        // since it is possible for another write + readToEnd to sneak in the meantime
                         for (int i = 0; i < numCallbacks; i++) {
                             Callback<Void> cb = readLogEndOffsetCallbacks.poll();
                             cb.onCompletion(null, null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index e9dd18e..aac1b78 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -282,7 +282,7 @@ public class KafkaConfigBackingStoreTest {
         assertNull(configState.taskConfig(TASK_IDS.get(0)));
         assertNull(configState.taskConfig(TASK_IDS.get(1)));
 
-        // Writing task task configs should block until all the writes have been performed and the root record update
+        // Writing task configs should block until all the writes have been performed and the root record update
         // has completed
         List<Map<String, String>> taskConfigs = Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1));
         configStorage.putTaskConfigs("connector1", taskConfigs);
@@ -335,7 +335,7 @@ public class KafkaConfigBackingStoreTest {
         ClusterConfigState configState = configStorage.snapshot();
         assertEquals(-1, configState.offset());
 
-        // Writing task task configs should block until all the writes have been performed and the root record update
+        // Writing task configs should block until all the writes have been performed and the root record update
         // has completed
         List<Map<String, String>> taskConfigs = Collections.emptyList();
         configStorage.putTaskConfigs("connector1", taskConfigs);

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 634d2d5..e29467d 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -325,7 +325,7 @@ class Partition(val topic: String,
   }
 
   /**
-   * Update the the follower's state in the leader based on the last fetch request. See
+   * Update the follower's state in the leader based on the last fetch request. See
    * [[kafka.cluster.Replica#updateLogReadResult]] for details.
    *
    * @return true if the leader's log start offset or high watermark have been updated

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 9019461..42ca6ea 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -380,7 +380,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
         pendingOffsets.foreach { case (topicPartition, commitRecordMetadataAndOffset) =>
           if (commitRecordMetadataAndOffset.appendedBatchOffset.isEmpty)
             throw new IllegalStateException(s"Trying to complete a transactional offset commit for producerId $producerId " +
-              s"and groupId $groupId even though the the offset commit record itself hasn't been appended to the log.")
+              s"and groupId $groupId even though the offset commit record itself hasn't been appended to the log.")
 
           val currentOffsetOpt = offsets.get(topicPartition)
           if (currentOffsetOpt.forall(_.olderThan(commitRecordMetadataAndOffset))) {
@@ -405,6 +405,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
 
   def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, OffsetAndMetadata] = {
     topicPartitions.flatMap { topicPartition =>
+
       pendingOffsetCommits.remove(topicPartition)
       pendingTransactionalOffsetCommits.foreach { case (_, pendingOffsets) =>
         pendingOffsets.remove(topicPartition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index b962e82..e79a6e3 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -601,7 +601,7 @@ class TransactionStateManager(brokerId: Int,
 
           val append: Boolean = metadata.inLock {
             if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) {
-              // the coordinator epoch has changed, reply to client immediately with with NOT_COORDINATOR
+              // the coordinator epoch has changed, reply to client immediately with NOT_COORDINATOR
               responseCallback(Errors.NOT_COORDINATOR)
               false
             } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
index a6a8202..7a00bc1 100644
--- a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
+++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
@@ -62,7 +62,7 @@ class DelayedDeleteRecords(delayMs: Long,
   /**
    * The delayed delete records operation can be completed if every partition specified in the request satisfied one of the following:
    *
-   * 1) There was an error while checking if all replicas have caught up to to the deleteRecordsOffset: set an error in response
+   * 1) There was an error while checking if all replicas have caught up to the deleteRecordsOffset: set an error in response
    * 2) The low watermark of the partition has caught up to the deleteRecordsOffset. set the low watermark in response
    *
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/core/src/main/scala/kafka/tools/JmxTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala
index c122141..4a6a348 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -67,7 +67,7 @@ object JmxTool extends Logging {
       .describedAs("format")
       .ofType(classOf[String])
     val jmxServiceUrlOpt =
-      parser.accepts("jmx-url", "The url to connect to to poll JMX data. See Oracle javadoc for JMXServiceURL for details.")
+      parser.accepts("jmx-url", "The url to connect to poll JMX data. See Oracle javadoc for JMXServiceURL for details.")
         .withRequiredArg
         .describedAs("service-url")
         .ofType(classOf[String])

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index 1bde7b1..ab13b0a 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -60,7 +60,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
   private val topic1 = "topic-1"
 
   /**
-   * With replication, producer should able able to find new leader after it detects broker failure
+   * With replication, producer should able to find new leader after it detects broker failure
    */
   @Ignore // To be re-enabled once we can make it less flaky (KAFKA-2837)
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
index a8955f5..ba11471 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
@@ -141,7 +141,7 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
     AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topicToDelete)
 
     TestUtils.waitUntilTrue(() => !groupTopicDirsForTopicToDelete.exists(groupTopicOffsetAndOwnerDirsExist),
-      "Consumer group info on deleted topic topic should be deleted by DeleteAllConsumerGroupInfoForTopicInZK")
+      "Consumer group info on deleted topic should be deleted by DeleteAllConsumerGroupInfoForTopicInZK")
     TestUtils.waitUntilTrue(() => groupTopicDirsForOtherTopic.forall(groupTopicOffsetAndOwnerDirsExist),
       "Consumer group info on unrelated topics should not be deleted by DeleteAllConsumerGroupInfoForTopicInZK")
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 6544d43..4c1f4ae 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -211,7 +211,7 @@ class LogManagerTest {
       log.appendAsLeader(set, leaderEpoch = 0)
     }
     time.sleep(logManager.InitialTaskDelayMs)
-    assertTrue("Time based flush should have been triggered triggered", lastFlush != log.lastFlushTime)
+    assertTrue("Time based flush should have been triggered", lastFlush != log.lastFlushTime)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index aebbf5c..0c05c46 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -328,7 +328,7 @@ class SocketServerTest extends JUnitSuite {
       TestUtils.waitUntilTrue(() => openChannel.isEmpty, "Idle channel not closed")
       TestUtils.waitUntilTrue(() => openOrClosingChannel.isDefined, "Channel removed without processing staged receives")
 
-      // Create new connection with same id when when `channel1` is in Selector.closingChannels
+      // Create new connection with same id when `channel1` is in Selector.closingChannels
       // Check that new connection is closed and openOrClosingChannel still contains `channel1`
       connectAndWaitForConnectionRegister()
       TestUtils.waitUntilTrue(() => connectionCount == 1, "Failed to close channel")

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index a99cd50..360b9dc 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1431,7 +1431,7 @@ object TestUtils extends Logging {
 
   private def asBytes(string: String) = string.getBytes(StandardCharsets.UTF_8)
 
-  // Verifies that the record was intended to be committed by checking the the headers for an expected transaction status
+  // Verifies that the record was intended to be committed by checking the headers for an expected transaction status
   // If true, this will return the value as a string. It is expected that the record in question should have been created
   // by the `producerRecordWithExpectedTransactionStatus` method.
   def assertCommittedAndGetValue(record: ConsumerRecord[Array[Byte], Array[Byte]]) : String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/docs/connect.html
----------------------------------------------------------------------
diff --git a/docs/connect.html b/docs/connect.html
index 886e348..78c66b1 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -329,7 +329,7 @@
         }
     </pre>
 
-    <p>These are slightly simplified versions, but show that that these methods should be relatively simple and the only work they should perform is allocating or freeing resources. There are two points to note about this implementation. First, the <code>start()</code> method does not yet handle resuming from a previous offset, which will be addressed in a later section. Second, the <code>stop()</code> method is synchronized. This will be necessary because <code>SourceTasks</code> are given a dedicated thread which they can block indefinitely, so they need to be stopped with a call from a different thread in the Worker.</p>
+    <p>These are slightly simplified versions, but show that these methods should be relatively simple and the only work they should perform is allocating or freeing resources. There are two points to note about this implementation. First, the <code>start()</code> method does not yet handle resuming from a previous offset, which will be addressed in a later section. Second, the <code>stop()</code> method is synchronized. This will be necessary because <code>SourceTasks</code> are given a dedicated thread which they can block indefinitely, so they need to be stopped with a call from a different thread in the Worker.</p>
 
     <p>Next, we implement the main functionality of the task, the <code>poll()</code> method which gets events from the input system and returns a <code>List&lt;SourceRecord&gt;</code>:</p>
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/docs/documentation/streams/architecture.html
----------------------------------------------------------------------
diff --git a/docs/documentation/streams/architecture.html b/docs/documentation/streams/architecture.html
index 6ba69f3..ad7b323 100644
--- a/docs/documentation/streams/architecture.html
+++ b/docs/documentation/streams/architecture.html
@@ -15,5 +15,5 @@
  limitations under the License.
 -->
 
-<!-- should always link the the latest release's documentation -->
+<!-- should always link the latest release's documentation -->
 <!--#include virtual="../../streams/architecture.html" -->

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/docs/documentation/streams/core-concepts.html
----------------------------------------------------------------------
diff --git a/docs/documentation/streams/core-concepts.html b/docs/documentation/streams/core-concepts.html
index ff46c53..d699b79 100644
--- a/docs/documentation/streams/core-concepts.html
+++ b/docs/documentation/streams/core-concepts.html
@@ -15,5 +15,5 @@
  limitations under the License.
 -->
 
-<!-- should always link the the latest release's documentation -->
+<!-- should always link the latest release's documentation -->
 <!--#include virtual="../../streams/core-concepts.html" -->

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/docs/documentation/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/documentation/streams/developer-guide.html b/docs/documentation/streams/developer-guide.html
index e258331..8812737 100644
--- a/docs/documentation/streams/developer-guide.html
+++ b/docs/documentation/streams/developer-guide.html
@@ -15,5 +15,5 @@
  limitations under the License.
 -->
 
-<!-- should always link the the latest release's documentation -->
+<!-- should always link the latest release's documentation -->
 <!--#include virtual="../../streams/developer-guide.html" -->

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/docs/documentation/streams/index.html
----------------------------------------------------------------------
diff --git a/docs/documentation/streams/index.html b/docs/documentation/streams/index.html
index 1aaaff4..5ff3b3b 100644
--- a/docs/documentation/streams/index.html
+++ b/docs/documentation/streams/index.html
@@ -15,5 +15,5 @@
  limitations under the License.
 -->
 
-<!-- should always link the the latest release's documentation -->
+<!-- should always link the latest release's documentation -->
 <!--#include virtual="../../streams/index.html" -->

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/docs/documentation/streams/quickstart.html
----------------------------------------------------------------------
diff --git a/docs/documentation/streams/quickstart.html b/docs/documentation/streams/quickstart.html
index f69c0d5..efb0234 100644
--- a/docs/documentation/streams/quickstart.html
+++ b/docs/documentation/streams/quickstart.html
@@ -15,5 +15,5 @@
  limitations under the License.
 -->
 
-<!-- should always link the the latest release's documentation -->
+<!-- should always link the latest release's documentation -->
 <!--#include virtual="../../streams/quickstart.html" -->

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/docs/documentation/streams/tutorial.html
----------------------------------------------------------------------
diff --git a/docs/documentation/streams/tutorial.html b/docs/documentation/streams/tutorial.html
index 90f408d..e2cf401 100644
--- a/docs/documentation/streams/tutorial.html
+++ b/docs/documentation/streams/tutorial.html
@@ -15,5 +15,5 @@
  limitations under the License.
 -->
 
-<!-- should always link the the latest release's documentation -->
+<!-- should always link the latest release's documentation -->
 <!--#include virtual="../../streams/tutorial.html" -->

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/docs/documentation/streams/upgrade-guide.html
----------------------------------------------------------------------
diff --git a/docs/documentation/streams/upgrade-guide.html b/docs/documentation/streams/upgrade-guide.html
index 0c68795..b1b3200 100644
--- a/docs/documentation/streams/upgrade-guide.html
+++ b/docs/documentation/streams/upgrade-guide.html
@@ -15,5 +15,5 @@
  limitations under the License.
 -->
 
-<!-- should always link the the latest release's documentation -->
+<!-- should always link the latest release's documentation -->
 <!--#include virtual="../../streams/upgrade-guide.html" -->

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/docs/implementation.html
----------------------------------------------------------------------
diff --git a/docs/implementation.html b/docs/implementation.html
index af234ea..8b97aa0 100644
--- a/docs/implementation.html
+++ b/docs/implementation.html
@@ -99,7 +99,7 @@
 		headerValueLength: varint
 		Value: byte[]
 	</pre></p>
-    <p>We use the the same varint encoding as Protobuf. More information on the latter can be found <a href="https://developers.google.com/protocol-buffers/docs/encoding#varints">here</a>. The count of headers in a record
+    <p>We use the same varint encoding as Protobuf. More information on the latter can be found <a href="https://developers.google.com/protocol-buffers/docs/encoding#varints">here</a>. The count of headers in a record
     is also encoded as a varint.</p>
 
     <h3><a id="log" href="#log">5.4 Log</a></h3>

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index b5cc6d7..0aac45a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -59,7 +59,7 @@ public class StreamsBuilder {
     final InternalTopologyBuilder internalTopologyBuilder = topology.internalTopologyBuilder;
 
     private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder);
-    
+
     /**
      * Create a {@link KStream} from the specified topics.
      * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value
@@ -237,7 +237,7 @@ public class StreamsBuilder {
      * If this is not the case the returned {@link KTable} will be corrupted.
      * <p>
      * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
-     * store name. Note that that store name may not be queriable through Interactive Queries.
+     * store name. Note that store name may not be queriable through Interactive Queries.
      * No internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      *
@@ -258,7 +258,7 @@ public class StreamsBuilder {
      * If this is not the case the returned {@link KTable} will be corrupted.
      * <p>
      * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
-     * store name. Note that that store name may not be queriable through Interactive Queries.
+     * store name. Note that store name may not be queriable through Interactive Queries.
      * No internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      *
@@ -312,7 +312,7 @@ public class StreamsBuilder {
      * Input {@link KeyValue records} with {@code null} key will be dropped.
      * <p>
      * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
-     * store name. Note that that store name may not be queriable through Interactive Queries.
+     * store name. Note that store name may not be queriable through Interactive Queries.
      * No internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      * <p>
@@ -343,7 +343,7 @@ public class StreamsBuilder {
      * Input {@link KeyValue records} with {@code null} key will be dropped.
      * <p>
      * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
-     * store name. Note that that store name may not be queriable through Interactive Queries.
+     * store name. Note that store name may not be queriable through Interactive Queries.
      * No internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      * <p>

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 77745d3..d747ce8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -444,7 +444,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
      * If this is not the case the returned {@link KTable} will be corrupted.
      * <p>
      * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
-     * store name. Note that that store name may not be queriable through Interactive Queries.
+     * store name. Note that store name may not be queriable through Interactive Queries.
      * No internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      * @param topic     the topic name; cannot be {@code null}
@@ -537,7 +537,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
      * If this is not the case the returned {@link KTable} will be corrupted.
      * <p>
      * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
-     * store name. Note that that store name may not be queriable through Interactive Queries.
+     * store name. Note that store name may not be queriable through Interactive Queries.
      * No internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      * <p>
@@ -714,7 +714,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
      * If this is not the case the returned {@link KTable} will be corrupted.
      * <p>
      * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
-     * store name. Note that that store name may not be queriable through Interactive Queries.
+     * store name. Note that store name may not be queriable through Interactive Queries.
      * No internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      * <p>
@@ -908,7 +908,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
      * If this is not the case the returned {@link KTable} will be corrupted.
      * <p>
      * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
-     * store name. Note that that store name may not be queriable through Interactive Queries.
+     * store name. Note that store name may not be queriable through Interactive Queries.
      * No internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      * <p>
@@ -1007,7 +1007,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
      * Input {@link KeyValue records} with {@code null} key will be dropped.
      * <p>
      * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
-     * store name. Note that that store name may not be queriable through Interactive Queries.
+     * store name. Note that store name may not be queriable through Interactive Queries.
      * No internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      * <p>
@@ -1196,7 +1196,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
      * Input {@link KeyValue records} with {@code null} key will be dropped.
      * <p>
      * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
-     * store name. Note that that store name may not be queriable through Interactive Queries.
+     * store name. Note that store name may not be queriable through Interactive Queries.
      * No internal changelog topic is created since the original input topic can be used for recovery (cf.
      * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
      * <p>

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 67b65a3..b1abdc2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -74,7 +74,7 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
 
             T newAgg = oldAgg;
 
-            // try to add the new new value
+            // try to add the new value
             if (value != null) {
                 newAgg = aggregator.apply(key, value, newAgg);
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index 90e1e94..d339624 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -68,7 +68,7 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
             V oldAgg = store.get(key);
             V newAgg = oldAgg;
 
-            // try to add the new new value
+            // try to add the new value
             if (value != null) {
                 if (newAgg == null) {
                     newAgg = value;

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 9730511..ec26866 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -104,7 +104,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
                         if (oldAgg == null)
                             oldAgg = initializer.apply();
 
-                        // try to add the new new value (there will never be old value)
+                        // try to add the new value (there will never be old value)
                         T newAgg = aggregator.apply(key, value, oldAgg);
 
                         // update the store with the new value

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index c20601a..7d02f11 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -98,7 +98,7 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
                         V oldAgg = entry.value;
                         V newAgg = oldAgg;
 
-                        // try to add the new new value (there will never be old value)
+                        // try to add the new value (there will never be old value)
                         if (newAgg == null) {
                             newAgg = value;
                         } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index 973de0f..0fe3e1f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -84,7 +84,7 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
                 newAgg = remove.apply(key, value.oldValue, newAgg);
             }
 
-            // then try to add the new new value
+            // then try to add the new value
             if (value.newValue != null) {
                 newAgg = add.apply(key, value.newValue, newAgg);
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 744cc9c..62484a4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -72,7 +72,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
             V oldAgg = store.get(key);
             V newAgg = oldAgg;
 
-            // first try to add the new new value
+            // first try to add the new value
             if (value.newValue != null) {
                 if (newAgg == null) {
                     newAgg = value.newValue;

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 8180b2c..06f45ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -469,7 +469,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
                         transactionInFlight = false;
                     } catch (final ProducerFencedException ignore) {
                         /* TODO
-                         * this should actually never happen atm as we we guard the call to #abortTransaction
+                         * this should actually never happen atm as we guard the call to #abortTransaction
                          * -> the reason for the guard is a "bug" in the Producer -- it throws IllegalStateException
                          * instead of ProducerFencedException atm. We can remove the isZombie flag after KAFKA-5604 got
                          * fixed and fall-back to this catch-and-swallow code

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/tests/setup.cfg
----------------------------------------------------------------------
diff --git a/tests/setup.cfg b/tests/setup.cfg
index c70f1e4..974d5bb 100644
--- a/tests/setup.cfg
+++ b/tests/setup.cfg
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-# pytest configuration (can also be defined in in tox.ini or pytest.ini file)
+# pytest configuration (can also be defined in tox.ini or pytest.ini file)
 #
 # This file defines naming convention and root search directory for autodiscovery of
 # pytest unit tests for the system test service classes.

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/tests/unit/setup.cfg
----------------------------------------------------------------------
diff --git a/tests/unit/setup.cfg b/tests/unit/setup.cfg
index e757a99..3470da1 100644
--- a/tests/unit/setup.cfg
+++ b/tests/unit/setup.cfg
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-# pytest configuration (can also be defined in in tox.ini or pytest.ini file)
+# pytest configuration (can also be defined in tox.ini or pytest.ini file)
 #
 # To ease possible confusion, prefix muckrake *unit* tests with 'check' instead of 'test', since
 # many muckrake files, classes, and methods have 'test' somewhere in the name

http://git-wip-us.apache.org/repos/asf/kafka/blob/86062e9a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
index 0436d67..d7572b0 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -289,7 +289,7 @@ public class ProducerPerformance {
                .metavar("TRANSACTION-DURATION")
                .dest("transactionDurationMs")
                .setDefault(0L)
-               .help("The max age of each transaction. The commitTransaction will be called after this this time has elapsed. Transactions are only enabled if this value is positive.");
+               .help("The max age of each transaction. The commitTransaction will be called after this time has elapsed. Transactions are only enabled if this value is positive.");
 
 
         return parser;