You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/06 02:09:08 UTC

[01/21] kafka git commit: MINOR: Revert 0.10.0 branch to SNAPSHOT per change in release process

Repository: kafka
Updated Branches:
  refs/heads/0.10.0 e0ac36f05 -> 0773bc4ba


MINOR: Revert 0.10.0 branch to SNAPSHOT per change in release process

Author: Gwen Shapira <cs...@gmail.com>

Reviewers: Ewen Cheslack-Postava

Closes #1126 from gwenshap/minor-release-version


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0773bc4b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0773bc4b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0773bc4b

Branch: refs/heads/0.10.0
Commit: 0773bc4ba604bc8b57040583ac4c1cb6832ba188
Parents: aa6f0d8
Author: Gwen Shapira <cs...@gmail.com>
Authored: Wed Mar 23 15:41:07 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700

----------------------------------------------------------------------
 gradle.properties           | 2 +-
 kafka-merge-pr.py           | 2 +-
 tests/kafkatest/__init__.py | 3 +--
 3 files changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0773bc4b/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
index 7f30b4d..b058e58 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -16,7 +16,7 @@
 group=org.apache.kafka
 # NOTE: When you change this version number, you should also make sure to update
 # the version numbers in tests/kafkatest/__init__.py and kafka-merge-pr.py.
-version=0.10.0.0
+version=0.10.0.0-SNAPSHOT
 scalaVersion=2.10.6
 task=build
 org.gradle.jvmargs=-XX:MaxPermSize=512m -Xmx1024m -Xss2m

http://git-wip-us.apache.org/repos/asf/kafka/blob/0773bc4b/kafka-merge-pr.py
----------------------------------------------------------------------
diff --git a/kafka-merge-pr.py b/kafka-merge-pr.py
index 2345dbb..e124105 100644
--- a/kafka-merge-pr.py
+++ b/kafka-merge-pr.py
@@ -72,7 +72,7 @@ RELEASE_BRANCH_PREFIX = "0."
 
 DEV_BRANCH_NAME = "trunk"
 
-DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "0.10.0.1")
+DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "0.10.0.0")
 
 def get_json(url):
     try:

http://git-wip-us.apache.org/repos/asf/kafka/blob/0773bc4b/tests/kafkatest/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py
index e1c87b7..df1a612 100644
--- a/tests/kafkatest/__init__.py
+++ b/tests/kafkatest/__init__.py
@@ -23,5 +23,4 @@
 # Instead, in trunk, the version should have a suffix of the form ".devN"
 #
 # For example, when Kafka is at version 0.9.0.0-SNAPSHOT, this should be something like "0.9.0.0.dev0"
-__version__ = '0.10.0.0'
-
+__version__ = '0.10.0.0.dev0'


[11/21] kafka git commit: KAFKA-2844; Separate keytabs for sasl tests

Posted by gw...@apache.org.
KAFKA-2844; Separate keytabs for sasl tests

Use a different keytab for server and client in SASL tests

Also:
* Improve approach used to build the JAAS files programmatically
* Delete stale `kafka_jaas.conf` file
* Move `FourLetterWords` to its own file, add `Zk` prefix and clean-up its usage

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Harsha Chintalapani, Gwen Shapira

Closes #533 from ijuma/separate-keytabs-for-sasl-tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c588a72a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c588a72a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c588a72a

Branch: refs/heads/0.10.0
Commit: c588a72ad21f313d0c0ced11f083eca18fab84a1
Parents: b5de412
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri Apr 1 15:25:35 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700

----------------------------------------------------------------------
 core/src/test/resources/kafka_jaas.conf         |  29 ----
 .../scala/integration/kafka/api/SaslSetup.scala |  39 ++---
 .../security/auth/ZkAuthorizationTest.scala     |  47 ++----
 .../scala/unit/kafka/utils/JaasTestUtils.scala  | 156 ++++++++++++-------
 .../scala/unit/kafka/zk/ZKEphemeralTest.scala   |  66 ++++----
 .../scala/unit/kafka/zk/ZkFourLetterWords.scala |  47 ++++++
 .../unit/kafka/zk/ZooKeeperTestHarness.scala    |  48 ++----
 7 files changed, 217 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c588a72a/core/src/test/resources/kafka_jaas.conf
----------------------------------------------------------------------
diff --git a/core/src/test/resources/kafka_jaas.conf b/core/src/test/resources/kafka_jaas.conf
deleted file mode 100644
index b097e26..0000000
--- a/core/src/test/resources/kafka_jaas.conf
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
-  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
-  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
-  * License. You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
-  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
-  * specific language governing permissions and limitations under the License.
-  */
-KafkaClient {
-	com.sun.security.auth.module.Krb5LoginModule required debug=true
-	useKeyTab=true
-	storeKey=true
-	serviceName="kafka"
-	keyTab="$keytab-location"
-	principal="client@EXAMPLE.COM";
-};
-
-KafkaServer {
-	com.sun.security.auth.module.Krb5LoginModule required debug=true
-	useKeyTab=true
-	storeKey=true
-	serviceName="kafka"
-	keyTab="$keytab-location"
-	principal="kafka/localhost@EXAMPLE.COM";
-};

http://git-wip-us.apache.org/repos/asf/kafka/blob/c588a72a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index 8255e6a..967cae1 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -35,8 +35,7 @@ case object KafkaSasl extends SaslSetupMode
 case object Both extends SaslSetupMode
 
 /*
- * Trait used in SaslTestHarness and EndToEndAuthorizationTest
- * currently to setup a keytab and jaas files.
+ * Trait used in SaslTestHarness and EndToEndAuthorizationTest to setup keytab and jaas files.
  */
 trait SaslSetup {
   private val workDir = TestUtils.tempDir()
@@ -46,34 +45,26 @@ trait SaslSetup {
   def startSasl(mode: SaslSetupMode = Both) {
     // Important if tests leak consumers, producers or brokers
     LoginManager.closeAll()
-    val keytabFile = createKeytabAndSetConfiguration(mode)
+    val (serverKeytabFile, clientKeytabFile) = createKeytabsAndSetConfiguration(mode)
     kdc.start()
-    kdc.createPrincipal(keytabFile, "client", "kafka/localhost")
+    kdc.createPrincipal(serverKeytabFile, "kafka/localhost")
+    kdc.createPrincipal(clientKeytabFile, "client")
     if (mode == Both || mode == ZkSasl)
       System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
   }
 
-  protected def createKeytabAndSetConfiguration(mode: SaslSetupMode): File = {
-    val (keytabFile, jaasFile) = createKeytabAndJaasFiles(mode)
+  protected def createKeytabsAndSetConfiguration(mode: SaslSetupMode): (File, File) = {
+    val serverKeytabFile = TestUtils.tempFile()
+    val clientKeytabFile = TestUtils.tempFile()
+    val jaasFile = mode match {
+      case ZkSasl => JaasTestUtils.writeZkFile()
+      case KafkaSasl => JaasTestUtils.writeKafkaFile(serverKeytabFile, clientKeytabFile)
+      case Both => JaasTestUtils.writeZkAndKafkaFiles(serverKeytabFile, clientKeytabFile)
+    }
     // This will cause a reload of the Configuration singleton when `getConfiguration` is called
     Configuration.setConfiguration(null)
-    System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile.getAbsolutePath)
-    keytabFile
-  }
-
-  private def createKeytabAndJaasFiles(mode: SaslSetupMode): (File, File) = {
-    val keytabFile = TestUtils.tempFile()
-    val jaasFileName: String = mode match {
-      case ZkSasl =>
-        JaasTestUtils.genZkFile
-      case KafkaSasl =>
-        JaasTestUtils.genKafkaFile(keytabFile.getAbsolutePath)
-      case _ =>
-        JaasTestUtils.genZkAndKafkaFile(keytabFile.getAbsolutePath)
-    }
-    val jaasFile = new File(jaasFileName)
-
-    (keytabFile, jaasFile)
+    System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile)
+    (serverKeytabFile, clientKeytabFile)
   }
 
   def closeSasl() {
@@ -81,7 +72,7 @@ trait SaslSetup {
     // Important if tests leak consumers, producers or brokers
     LoginManager.closeAll()
     System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
-    System.clearProperty("zookeeper.authProvider.1");
+    System.clearProperty("zookeeper.authProvider.1")
     Configuration.setConfiguration(null)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c588a72a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 6a533b3..ab5324c 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -22,17 +22,17 @@ import kafka.utils.{Logging, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.security.JaasUtils
-import org.apache.zookeeper.data.{ACL, Stat}
+import org.apache.zookeeper.data.{ACL}
 import org.junit.Assert._
-import org.junit.{After, Before, BeforeClass, Test}
+import org.junit.{After, Before, Test}
 import scala.collection.JavaConverters._
 import scala.util.{Try, Success, Failure}
 import javax.security.auth.login.Configuration
 
+class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
+  val jaasFile = kafka.utils.JaasTestUtils.writeZkFile
+  val authProvider = "zookeeper.authProvider.1"
 
-class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
-  val jaasFile: String = kafka.utils.JaasTestUtils.genZkFile
-  val authProvider: String = "zookeeper.authProvider.1"
   @Before
   override def setUp() {
     Configuration.setConfiguration(null)
@@ -65,12 +65,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
       JaasUtils.isZkSecurityEnabled()
       fail("Should have thrown an exception")
     } catch {
-      case e: KafkaException => {
-        // Expected
-      }
-      case e: Exception => {
-        fail(e.toString)
-      }
+      case e: KafkaException => // Expected
     }
   }
 
@@ -241,10 +236,10 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
       case false => list.size == 1
     } 
     isListSizeCorrect && list.asScala.forall(
-        secure match {
-          case true => isAclSecure
-          case false => isAclUnsecure
-        })
+      secure match {
+        case true => isAclSecure
+        case false => isAclUnsecure
+      })
   }
   
   /**
@@ -255,15 +250,9 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
   private def isAclSecure(acl: ACL): Boolean = {
     info(s"ACL $acl")
     acl.getPerms match {
-      case 1 => {
-        acl.getId.getScheme.equals("world")
-      }
-      case 31 => {
-        acl.getId.getScheme.equals("sasl")
-      }
-      case _: Int => {
-        false
-      }
+      case 1 => acl.getId.getScheme.equals("world")
+      case 31 => acl.getId.getScheme.equals("sasl")
+      case _ => false
     }
   }
   
@@ -273,12 +262,8 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
   private def isAclUnsecure(acl: ACL): Boolean = {
     info(s"ACL $acl")
     acl.getPerms match {
-      case 31 => {
-        acl.getId.getScheme.equals("world")
-      }
-      case _: Int => {
-        false
-      }
+      case 31 => acl.getId.getScheme.equals("world")
+      case _ => false
     }
   }
   
@@ -323,7 +308,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{
       case "/" => result
       // For all other paths, try to delete it
       case path =>
-        try{
+        try {
           zkUtils.deletePath(path)
           Failure(new Exception(s"Have been able to delete $path"))
         } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c588a72a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
index cf08830..a14cd3f 100644
--- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
@@ -16,72 +16,110 @@
  */
 package kafka.utils
 
+import java.io.{File, BufferedWriter, FileWriter}
 
 object JaasTestUtils {
-  // ZooKeeper vals
-  val zkServerContextName = "Server"
-  val zkClientContextName = "Client"
-  val userSuperPasswd = "adminpasswd"
-  val user = "fpj"
-  val userPasswd = "fpjsecret"
-  val zkModule = "org.apache.zookeeper.server.auth.DigestLoginModule"
-  //Kafka vals
-  val kafkaServerContextName = "KafkaServer"
-  val kafkaClientContextName = "KafkaClient"
-  val kafkaServerPrincipal = "client@EXAMPLE.COM"
-  val kafkaClientPrincipal = "kafka/localhost@EXAMPLE.COM"
-  val kafkaModule = "com.sun.security.auth.module.Krb5LoginModule"
-  
-  def genZkFile: String = {
-    val jaasFile = java.io.File.createTempFile("jaas", ".conf")
-    val jaasOutputStream = new java.io.FileOutputStream(jaasFile)
-    writeZkToOutputStream(jaasOutputStream)
-    jaasOutputStream.close()
-    jaasFile.deleteOnExit()
+
+  case class Krb5LoginModule(contextName: String,
+                             useKeyTab: Boolean,
+                             storeKey: Boolean,
+                             keyTab: String,
+                             principal: String,
+                             debug: Boolean,
+                             serviceName: Option[String]) {
+    def toJaasSection: JaasSection = {
+      JaasSection(
+        contextName,
+        "com.sun.security.auth.module.Krb5LoginModule",
+        debug = debug,
+        entries = Map(
+          "useKeyTab" -> useKeyTab.toString,
+          "storeKey" -> storeKey.toString,
+          "keyTab" -> keyTab,
+          "principal" -> principal
+        ) ++ serviceName.map(s => Map("serviceName" -> s)).getOrElse(Map.empty)
+      )
+    }
+  }
+
+  case class JaasSection(contextName: String,
+                         moduleName: String,
+                         debug: Boolean,
+                         entries: Map[String, String]) {
+    override def toString: String = {
+      s"""|$contextName {
+          |  $moduleName required
+          |  debug=$debug
+          |  ${entries.map { case (k, v) => s"""$k="$v"""" }.mkString("", "\n|  ", ";")}
+          |};
+          |""".stripMargin
+    }
+  }
+
+  private val ZkServerContextName = "Server"
+  private val ZkClientContextName = "Client"
+  private val ZkUserSuperPasswd = "adminpasswd"
+  private val ZkUser = "fpj"
+  private val ZkUserPassword = "fpjsecret"
+  private val ZkModule = "org.apache.zookeeper.server.auth.DigestLoginModule"
+
+  private val KafkaServerContextName = "KafkaServer"
+  private val KafkaServerPrincipal = "kafka/localhost@EXAMPLE.COM"
+  private val KafkaClientContextName = "KafkaClient"
+  private val KafkaClientPrincipal = "client@EXAMPLE.COM"
+
+  def writeZkFile(): String = {
+    val jaasFile = TestUtils.tempFile()
+    writeToFile(jaasFile, zkSections)
     jaasFile.getCanonicalPath
   }
-  
-  def genKafkaFile(keytabLocation: String): String = {
-    val jaasFile = java.io.File.createTempFile("jaas", ".conf")
-    val jaasOutputStream = new java.io.FileOutputStream(jaasFile)
-    writeKafkaToOutputStream(jaasOutputStream, keytabLocation)
-    jaasOutputStream.close()
-    jaasFile.deleteOnExit()
+
+  def writeKafkaFile(serverKeyTabLocation: File, clientKeyTabLocation: File): String = {
+    val jaasFile = TestUtils.tempFile()
+    writeToFile(jaasFile, kafkaSections(serverKeyTabLocation, clientKeyTabLocation))
     jaasFile.getCanonicalPath
   }
-  
-  def genZkAndKafkaFile(keytabLocation: String): String = {
-    val jaasFile = java.io.File.createTempFile("jaas", ".conf")
-    val jaasOutputStream = new java.io.FileOutputStream(jaasFile)
-    writeKafkaToOutputStream(jaasOutputStream, keytabLocation)
-    jaasOutputStream.write("\n\n".getBytes)
-    writeZkToOutputStream(jaasOutputStream)
-    jaasOutputStream.close()
-    jaasFile.deleteOnExit()
+
+  def writeZkAndKafkaFiles(serverKeyTabLocation: File, clientKeyTabLocation: File): String = {
+    val jaasFile = TestUtils.tempFile()
+    writeToFile(jaasFile, kafkaSections(serverKeyTabLocation, clientKeyTabLocation) ++ zkSections)
     jaasFile.getCanonicalPath
   }
-  
-  private def writeZkToOutputStream(jaasOutputStream: java.io.FileOutputStream) {
-    jaasOutputStream.write(s"$zkServerContextName {\n\t$zkModule required\n".getBytes)
-    jaasOutputStream.write(s"""\tuser_super="$userSuperPasswd"\n""".getBytes)
-    jaasOutputStream.write(s"""\tuser_$user="$userPasswd";\n};\n\n""".getBytes)
-    jaasOutputStream.write(s"""$zkClientContextName {\n\t$zkModule required\n""".getBytes)
-    jaasOutputStream.write(s"""\tusername="$user"\n""".getBytes)
-    jaasOutputStream.write(s"""\tpassword="$userPasswd";\n};""".getBytes)
+
+  private def zkSections: Seq[JaasSection] = Seq(
+    JaasSection(ZkServerContextName, ZkModule, false, Map("user_super" -> ZkUserSuperPasswd, s"user_$ZkUser" -> ZkUserPassword)),
+    JaasSection(ZkClientContextName, ZkModule, false, Map("username" -> ZkUser, "password" -> ZkUserPassword))
+  )
+
+  private def kafkaSections(serverKeytabLocation: File, clientKeytabLocation: File): Seq[JaasSection] = {
+    Seq(
+      Krb5LoginModule(
+        KafkaServerContextName,
+        useKeyTab = true,
+        storeKey = true,
+        keyTab = serverKeytabLocation.getAbsolutePath,
+        principal = KafkaServerPrincipal,
+        debug = true,
+        serviceName = Some("kafka")),
+      Krb5LoginModule(
+        KafkaClientContextName,
+        useKeyTab = true,
+        storeKey = true,
+        keyTab = clientKeytabLocation.getAbsolutePath,
+        principal = KafkaClientPrincipal,
+        debug = true,
+        serviceName = Some("kafka")
+      )
+    ).map(_.toJaasSection)
   }
-  
-  private def writeKafkaToOutputStream(jaasOutputStream: java.io.FileOutputStream, keytabLocation: String) {
-    jaasOutputStream.write(s"$kafkaClientContextName {\n\t$kafkaModule required debug=true\n".getBytes)
-    jaasOutputStream.write(s"\tuseKeyTab=true\n".getBytes)
-    jaasOutputStream.write(s"\tstoreKey=true\n".getBytes)
-    jaasOutputStream.write(s"""\tserviceName="kafka"\n""".getBytes)
-    jaasOutputStream.write(s"""\tkeyTab="$keytabLocation"\n""".getBytes)
-    jaasOutputStream.write(s"""\tprincipal="$kafkaServerPrincipal";\n};\n\n""".getBytes)
-    jaasOutputStream.write(s"""$kafkaServerContextName {\n\t$kafkaModule required debug=true\n""".getBytes)
-    jaasOutputStream.write(s"\tuseKeyTab=true\n".getBytes)
-    jaasOutputStream.write(s"\tstoreKey=true\n".getBytes)
-    jaasOutputStream.write(s"""\tserviceName="kafka"\n""".getBytes)
-    jaasOutputStream.write(s"""\tkeyTab="$keytabLocation"\n""".getBytes)
-    jaasOutputStream.write(s"""\tprincipal="$kafkaClientPrincipal";\n};""".getBytes)
+
+  private def jaasSectionsToString(jaasSections: Seq[JaasSection]): String =
+    jaasSections.mkString
+
+  private def writeToFile(file: File, jaasSections: Seq[JaasSection]) {
+    val writer = new BufferedWriter(new FileWriter(file))
+    try writer.write(jaasSectionsToString(jaasSections))
+    finally writer.close()
   }
-}
\ No newline at end of file
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c588a72a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
index 32c7a5d..c2c25ed 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
@@ -17,10 +17,11 @@
 
 package kafka.zk
 
-import java.util.ArrayList
-import java.util.Collection
+import java.lang.Iterable
 import javax.security.auth.login.Configuration
 
+import scala.collection.JavaConverters._
+
 import kafka.consumer.ConsumerConfig
 import kafka.utils.ZkUtils
 import kafka.utils.ZKCheckedEphemeral
@@ -30,26 +31,24 @@ import org.apache.zookeeper.CreateMode
 import org.apache.zookeeper.WatchedEvent
 import org.apache.zookeeper.Watcher
 import org.apache.zookeeper.ZooDefs.Ids
-import org.I0Itec.zkclient.exception.{ZkException,ZkNodeExistsException}
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.junit.{After, Before, Test, Assert}
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+import org.junit.runner.RunWith
 
 object ZKEphemeralTest {
+
   @Parameters
-  def enableSecurityOptions: Collection[Array[java.lang.Boolean]] = {
-    val list = new ArrayList[Array[java.lang.Boolean]]()
-    list.add(Array(true))
-    list.add(Array(false))
-    list
-  }
+  def enableSecurityOptions: Iterable[Array[java.lang.Boolean]] =
+    Seq[Array[java.lang.Boolean]](Array(true), Array(false)).asJava
+
 }
 
 @RunWith(value = classOf[Parameterized])
 class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
-  val jaasFile: String = kafka.utils.JaasTestUtils.genZkFile
-  val authProvider: String = "zookeeper.authProvider.1"
+  val jaasFile = kafka.utils.JaasTestUtils.writeZkFile()
+  val authProvider = "zookeeper.authProvider.1"
   var zkSessionTimeoutMs = 1000
   
   @Before
@@ -103,17 +102,14 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
    */
   @Test
   def testZkWatchedEphemeral = {
-    var path = "/zwe-test"
-    testCreation(path)
-    path = "/zwe-test-parent/zwe-test"
-    testCreation(path)
+    testCreation("/zwe-test")
+    testCreation("/zwe-test-parent/zwe-test")
   }
  
   private def testCreation(path: String) {
     val zk = zkUtils.zkConnection.getZookeeper
     val zwe = new ZKCheckedEphemeral(path, "", zk, JaasUtils.isZkSecurityEnabled())
     var created = false
-    var counter = 10
 
     zk.exists(path, new Watcher() {
       def process(event: WatchedEvent) {
@@ -140,19 +136,19 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
     //Creates a second session
     val (zkClient2, zkConnection2) = ZkUtils.createZkClientAndConnection(zkConnect, zkSessionTimeoutMs, zkConnectionTimeout)
     val zk2 = zkConnection2.getZookeeper
-    var zwe = new ZKCheckedEphemeral(path, "", zk2, JaasUtils.isZkSecurityEnabled())
+    val zwe = new ZKCheckedEphemeral(path, "", zk2, JaasUtils.isZkSecurityEnabled())
 
     // Creates znode for path in the first session
     zk1.create(path, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
     
     //Bootstraps the ZKWatchedEphemeral object
-    var gotException = false;
-    try {
-      zwe.create()
-    } catch {
-      case e: ZkNodeExistsException =>
-        gotException = true
-    }
+    val gotException =
+      try {
+        zwe.create()
+        false
+      } catch {
+        case e: ZkNodeExistsException => true
+      }
     Assert.assertTrue(gotException)
     zkClient2.close()
   }
@@ -168,15 +164,15 @@ class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
     // Creates znode for path in the first session
     zk.create(path, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
     
-    var zwe = new ZKCheckedEphemeral(path, "", zk, JaasUtils.isZkSecurityEnabled())
+    val zwe = new ZKCheckedEphemeral(path, "", zk, JaasUtils.isZkSecurityEnabled())
     //Bootstraps the ZKWatchedEphemeral object
-    var gotException = false;
-    try {
-      zwe.create()
-    } catch {
-      case e: ZkNodeExistsException =>
-        gotException = true
-    }
+    val gotException =
+      try {
+        zwe.create()
+        false
+      } catch {
+        case e: ZkNodeExistsException => true
+      }
     Assert.assertFalse(gotException)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c588a72a/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala b/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala
new file mode 100644
index 0000000..6eaee70
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/zk/ZkFourLetterWords.scala
@@ -0,0 +1,47 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.zk
+
+import java.io.IOException
+import java.net.{SocketTimeoutException, Socket, InetAddress, InetSocketAddress}
+
+/**
+  * ZooKeeper responds to a small set of commands. Each command is composed of four letters. You issue the commands to
+  * ZooKeeper via telnet or nc, at the client port.
+  *
+  * Three of the more interesting commands: "stat" gives some general information about the server and connected
+  * clients, while "srvr" and "cons" give extended details on server and connections respectively.
+  */
+object ZkFourLetterWords {
+  def sendStat(host: String, port: Int, timeout: Int) {
+    val hostAddress =
+      if (host != null) new InetSocketAddress(host, port)
+      else new InetSocketAddress(InetAddress.getByName(null), port)
+    val sock = new Socket()
+    try {
+      sock.connect(hostAddress, timeout)
+      val outStream = sock.getOutputStream
+      outStream.write("stat".getBytes)
+      outStream.flush()
+    } catch {
+      case e: SocketTimeoutException => throw new IOException("Exception while sending 4lw")
+    } finally {
+      sock.close
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c588a72a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index d618ba6..95f4e35 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -17,40 +17,12 @@
 
 package kafka.zk
 
-import java.io._
-import java.net._
 import javax.security.auth.login.Configuration
-import org.I0Itec.zkclient.{ZkClient, ZkConnection}
 import kafka.utils.{ZkUtils, Logging, CoreUtils}
 import org.junit.{After, Before}
 import org.scalatest.junit.JUnitSuite
 import org.apache.kafka.common.security.JaasUtils
 
-object FourLetterWords {
-  def sendStat(host: String, port: Int, timeout: Int) {
-    val hostAddress = if (host != null)
-      new InetSocketAddress(host, port)
-    else
-      new InetSocketAddress(InetAddress.getByName(null), port)
-    val sock = new Socket()
-    var reader: BufferedReader = null
-    sock.connect(hostAddress, timeout)
-    try {
-      val outstream = sock.getOutputStream
-      outstream.write("stat".getBytes)
-      outstream.flush
-    } catch {
-      case e: SocketTimeoutException => {
-        throw new IOException("Exception while sending 4lw")
-      }
-    } finally {
-      sock.close
-      if (reader != null)
-        reader.close
-    }
-  }
-}
-
 trait ZooKeeperTestHarness extends JUnitSuite with Logging {
   var zookeeper: EmbeddedZookeeper = null
   var zkPort: Int = -1
@@ -73,18 +45,20 @@ trait ZooKeeperTestHarness extends JUnitSuite with Logging {
      CoreUtils.swallow(zkUtils.close())
     if (zookeeper != null)
       CoreUtils.swallow(zookeeper.shutdown())
-      
-    var isDown = false  
-    while(!isDown) {
+
+    def isDown(): Boolean = {
       try {
-        FourLetterWords.sendStat("127.0.0.1", zkPort, 3000)
-      } catch {
-        case _: Throwable => {
-          info("Server is down")
-          isDown = true
-        }
+        ZkFourLetterWords.sendStat("127.0.0.1", zkPort, 3000)
+        false
+      } catch { case _: Throwable =>
+        debug("Server is down")
+        true
       }
     }
+
+    Iterator.continually(isDown()).exists(identity)
+
     Configuration.setConfiguration(null)
   }
+
 }


[07/21] kafka git commit: KAFKA-3445: Validate TASKS_MAX_CONFIG's lower bound

Posted by gw...@apache.org.
KAFKA-3445: Validate TASKS_MAX_CONFIG's lower bound

Currently the property TASKS_MAX_CONFIG is not validated against nonsensical values such as 0. This patch leverages the Range.atLeast() method to ensure value is at least 1.

Author: Ryan P <Ry...@Gmail.com>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1132 from rnpridgeon/KAFKA-3445


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e8593d1b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e8593d1b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e8593d1b

Branch: refs/heads/0.10.0
Commit: e8593d1b4529a7b9f3471ac8c1411dba336d6708
Parents: 9c5af25
Author: Ryan P <Ry...@Gmail.com>
Authored: Thu Mar 24 10:12:19 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/connect/runtime/ConnectorConfig.java | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e8593d1b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index e21faf6..e439552 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.config.ConfigDef.Width;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+
 
 import java.util.HashMap;
 import java.util.Map;
@@ -54,6 +56,8 @@ public class ConnectorConfig extends AbstractConfig {
     public static final String TASKS_MAX_CONFIG = "tasks.max";
     private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector.";
     public static final int TASKS_MAX_DEFAULT = 1;
+    private static final int TASKS_MIN_CONFIG = 1;
+
     private static final String TASK_MAX_DISPLAY = "Tasks max";
 
     public static final String TOPICS_CONFIG = "topics";
@@ -67,7 +71,7 @@ public class ConnectorConfig extends AbstractConfig {
         config = new ConfigDef()
                 .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY)
                 .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, 2, Width.LONG, CONNECTOR_CLASS_DISPLAY)
-                .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY)
+                .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT,  atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY)
                 .define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, Width.LONG, TOPICS_DISPLAY);
     }
 


[09/21] kafka git commit: MINOR: Add check for empty topics iterator in ReplicaVerificationTool.

Posted by gw...@apache.org.
MINOR: Add check for empty topics iterator in ReplicaVerificationTool.

Author: Ashish Singh <as...@cloudera.com>

Reviewers: Guozhang Wang, Gwen Shapira

Closes #1167 from SinghAsDev/minorFixRelicaLagTool


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b5de4122
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b5de4122
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b5de4122

Branch: refs/heads/0.10.0
Commit: b5de41227f11a495d5dd7e1cf785220365d84534
Parents: ae0a5a0
Author: Ashish Singh <as...@cloudera.com>
Authored: Fri Apr 1 14:12:49 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b5de4122/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index fe4968d..71bf0c0 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -128,6 +128,12 @@ object ReplicaVerificationTool extends Logging {
         else
           false
     )
+
+    if (filteredTopicMetadata.isEmpty) {
+      error("No topics found. " + topicWhiteListOpt + ", if specified, is either filtering out all topics or there is no topic.")
+      System.exit(1)
+    }
+
     val topicPartitionReplicaList: Seq[TopicPartitionReplica] = filteredTopicMetadata.flatMap(
       topicMetadataResponse =>
         topicMetadataResponse.partitionsMetadata.flatMap(


[14/21] kafka git commit: HOTFIX: set timestamp in SinkNode

Posted by gw...@apache.org.
HOTFIX: set timestamp in SinkNode

guozhangwang
Setting the timestamp in produced records in SinkNode. This forces the producer record's timestamp same as the context's timestamp.

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #1137 from ymatsuda/set_timestamp_in_sinknode


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/80ba01e1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/80ba01e1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/80ba01e1

Branch: refs/heads/0.10.0
Commit: 80ba01e16ba17c48058987ee3a1384f1e23df343
Parents: 625c516
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Mon Apr 4 14:57:15 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/processor/internals/SinkNode.java     | 2 +-
 streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java | 1 +
 2 files changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/80ba01e1/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index ffc72fd..31a558b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -57,7 +57,7 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
     public void process(K key, V value) {
         // send to all the registered topics
         RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
-        collector.send(new ProducerRecord<>(topic, key, value), keySerializer, valSerializer, partitioner);
+        collector.send(new ProducerRecord<>(topic, null, context.timestamp(), key, value), keySerializer, valSerializer, partitioner);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/80ba01e1/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 05713c1..0c56c26 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -57,6 +57,7 @@ public class KStreamTestDriver {
         this.topology = builder.build("X", null);
         this.stateDir = stateDir;
         this.context = new MockProcessorContext(this, stateDir, keySerde, valSerde, new MockRecordCollector());
+        this.context.setTime(0L);
 
         for (StateStoreSupplier stateStoreSupplier : topology.stateStoreSuppliers()) {
             StateStore store = stateStoreSupplier.get();


[04/21] kafka git commit: KAFKA-2930: Update references to ZooKeeper in the docs.

Posted by gw...@apache.org.
KAFKA-2930: Update references to ZooKeeper in the docs.

Author: Flavio Junqueira <fp...@apache.org>

Reviewers: Ismael Juma, Gwen Shapira

Closes #615 from fpj/KAFKA-2930


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c216f8a8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c216f8a8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c216f8a8

Branch: refs/heads/0.10.0
Commit: c216f8a8e8ff6a3c140b9e0678c4362d4b035982
Parents: c588a72
Author: Flavio Junqueira <fp...@apache.org>
Authored: Fri Apr 1 15:57:39 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700

----------------------------------------------------------------------
 docs/ops.html | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c216f8a8/docs/ops.html
----------------------------------------------------------------------
diff --git a/docs/ops.html b/docs/ops.html
index 541a01d..b239a0e 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -934,17 +934,17 @@ The final alerting we do is on the correctness of the data delivery. We audit th
 <h3><a id="zk" href="#zk">6.7 ZooKeeper</a></h3>
 
 <h4><a id="zkversion" href="#zkversion">Stable version</a></h4>
-At LinkedIn, we are running ZooKeeper 3.3.*. Version 3.3.3 has known serious issues regarding ephemeral node deletion and session expirations. After running into those issues in production, we upgraded to 3.3.4 and have been running that smoothly for over a year now.
+The current stable branch is 3.4 and the latest release of that branch is 3.4.6, which is the one ZkClient 0.7 uses. ZkClient is the client layer Kafka uses to interact with ZooKeeper.
 
 <h4><a id="zkops" href="#zkops">Operationalizing ZooKeeper</a></h4>
 Operationally, we do the following for a healthy ZooKeeper installation:
 <ul>
-  <li>Redundancy in the physical/hardware/network layout: try not to put them all in the same rack, decent (but don't go nuts) hardware, try to keep redundant power and network paths, etc.</li>
-  <li>I/O segregation: if you do a lot of write type traffic you'll almost definitely want the transaction logs on a different disk group than application logs and snapshots (the write to the ZooKeeper service has a synchronous write to disk, which can be slow).</li>
+  <li>Redundancy in the physical/hardware/network layout: try not to put them all in the same rack, decent (but don't go nuts) hardware, try to keep redundant power and network paths, etc. A typical ZooKeeper ensemble has 5 or 7 servers, which tolerates 2 and 3 servers down, respectively. If you have a small deployment, then using 3 servers is acceptable, but keep in mind that you'll only be able to tolerate 1 server down in this case. </li>
+  <li>I/O segregation: if you do a lot of write type traffic you'll almost definitely want the transaction logs on a dedicated disk group. Writes to the transaction log are synchronous (but batched for performance), and consequently, concurrent writes can significantly affect performance. ZooKeeper snapshots can be one such a source of concurrent writes, and ideally should be written on a disk group separate from the transaction log. Snapshots are writtent to disk asynchronously, so it is typically ok to share with the operating system and message log files. You can configure a server to use a separate disk group with the dataLogDir parameter.</li>
   <li>Application segregation: Unless you really understand the application patterns of other apps that you want to install on the same box, it can be a good idea to run ZooKeeper in isolation (though this can be a balancing act with the capabilities of the hardware).</li>
   <li>Use care with virtualization: It can work, depending on your cluster layout and read/write patterns and SLAs, but the tiny overheads introduced by the virtualization layer can add up and throw off ZooKeeper, as it can be very time sensitive</li>
-  <li>ZooKeeper configuration and monitoring: It's java, make sure you give it 'enough' heap space (We usually run them with 3-5G, but that's mostly due to the data set size we have here). Unfortunately we don't have a good formula for it. As far as monitoring, both JMX and the 4 letter words (4lw) commands are very useful, they do overlap in some cases (and in those cases we prefer the 4 letter commands, they seem more predictable, or at the very least, they work better with the LI monitoring infrastructure)</li>
-  <li>Don't overbuild the cluster: large clusters, especially in a write heavy usage pattern, means a lot of intracluster communication (quorums on the writes and subsequent cluster member updates), but don't underbuild it (and risk swamping the cluster).</li>
-  <li>Try to run on a 3-5 node cluster: ZooKeeper writes use quorums and inherently that means having an odd number of machines in a cluster. Remember that a 5 node cluster will cause writes to slow down compared to a 3 node cluster, but will allow more fault tolerance.</li>
+  <li>ZooKeeper configuration: It's java, make sure you give it 'enough' heap space (We usually run them with 3-5G, but that's mostly due to the data set size we have here). Unfortunately we don't have a good formula for it, but keep in mind that allowing for more ZooKeeper state means that snapshots can become large, and large snapshots affect recovery time. In fact, if the snapshot becomes too large (a few gigabytes), then you may need to increase the initLimit parameter to give enough time for servers to recover and join the ensemble.</li> 
+  <li>Monitoring: Both JMX and the 4 letter words (4lw) commands are very useful, they do overlap in some cases (and in those cases we prefer the 4 letter commands, they seem more predictable, or at the very least, they work better with the LI monitoring infrastructure)</li>
+  <li>Don't overbuild the cluster: large clusters, especially in a write heavy usage pattern, means a lot of intracluster communication (quorums on the writes and subsequent cluster member updates), but don't underbuild it (and risk swamping the cluster). Having more servers adds to your read capacity.</li>
 </ul>
 Overall, we try to keep the ZooKeeper system as small as will handle the load (plus standard growth capacity planning) and as simple as possible. We try not to do anything fancy with the configuration or application layout as compared to the official release as well as keep it as self contained as possible. For these reasons, we tend to skip the OS packaged versions, since it has a tendency to try to put things in the OS standard hierarchy, which can be 'messy', for want of a better way to word it.


[21/21] kafka git commit: KAFKA-3477: extended KStream/KTable API to specify custom partitioner for sinks

Posted by gw...@apache.org.
KAFKA-3477: extended KStream/KTable API to specify custom partitioner for sinks

Author: mjsax <ma...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #1180 from mjsax/kafka-3477-streamPartitioner-DSL


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5c5fe7bd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5c5fe7bd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5c5fe7bd

Branch: refs/heads/0.10.0
Commit: 5c5fe7bd795f5aab5248fb718c61c8ca3f2f571a
Parents: 35fadbf
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Tue Apr 5 15:56:09 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  |  2 +-
 .../apache/kafka/streams/kstream/KStream.java   | 88 +++++++++++++++++---
 .../apache/kafka/streams/kstream/KTable.java    | 88 +++++++++++++++++---
 .../streams/kstream/internals/KStreamImpl.java  | 39 ++++++---
 .../streams/kstream/internals/KTableImpl.java   | 30 ++++++-
 5 files changed, 206 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5c5fe7bd/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 20958e4..e8fda10 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -49,7 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * A {@link KafkaStreams} instance can co-ordinate with any other instances with the same application ID (whether in this same process, on other processes
  * on this machine, or on remote machines) as a single (possibly distributed) stream processing client. These instances will divide up the work
  * based on the assignment of the input topic partitions so that all partitions are being
- * consumed. If instances are added or failed, all instances will rebelance the partition assignment among themselves
+ * consumed. If instances are added or failed, all instances will rebalance the partition assignment among themselves
  * to balance processing load.
  * <p>
  * Internally the {@link KafkaStreams} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c5fe7bd/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 2313b8b..e4933cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 
 /**
  * KStream is an abstraction of a <i>record stream</i> of key-value pairs.
@@ -92,7 +93,7 @@ public interface KStream<K, V> {
 
     /**
      * Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic
-     * using default serializers and deserializers.
+     * using default serializers and deserializers and producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
      * This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(String...)}.
      *
      * @param topic     the topic name
@@ -100,37 +101,98 @@ public interface KStream<K, V> {
     KStream<K, V> through(String topic);
 
     /**
+     * Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic
+     * using default serializers and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
+     * This is equivalent to calling {@link #to(StreamPartitioner, String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(String...)}.
+     *
+     * @param partitioner  the function used to determine how records are distributed among partitions of the topic,
+     *                     if not specified producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
+     * @param topic        the topic name
+     */
+    KStream<K, V> through(StreamPartitioner<K, V> partitioner, String topic);
+
+    /**
      * Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic.
+     * If {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer}
+     * for the key {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} is used
+     * &mdash; otherwise producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} is used.
      * This is equivalent to calling {@link #to(Serde, Serde, String)} and
      * {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(Serde, Serde, String...)}.
      *
-     * @param keySerde  key serde used to send key-value pairs,
-     *                  if not specified the default key serde defined in the configuration will be used
-     * @param valSerde  value serde used to send key-value pairs,
-     *                  if not specified the default value serde defined in the configuration will be used
-     * @param topic     the topic name
+     * @param keySerde     key serde used to send key-value pairs,
+     *                     if not specified the default key serde defined in the configuration will be used
+     * @param valSerde     value serde used to send key-value pairs,
+     *                     if not specified the default value serde defined in the configuration will be used
+     * @param topic        the topic name
      */
     KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic);
 
     /**
-     * Materialize this stream to a topic using default serializers specified in the config.
+     * Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic
+     * using a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
+     * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)} and
+     * {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(Serde, Serde, String...)}.
+     *
+     * @param keySerde     key serde used to send key-value pairs,
+     *                     if not specified the default key serde defined in the configuration will be used
+     * @param valSerde     value serde used to send key-value pairs,
+     *                     if not specified the default value serde defined in the configuration will be used
+     * @param partitioner  the function used to determine how records are distributed among partitions of the topic,
+     *                     if not specified and {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key
+     *                     {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used
+     *                     &mdash; otherwise {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
+     * @param topic        the topic name
+     */
+    KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic);
+
+    /**
+     * Materialize this stream to a topic using default serializers specified in the config
+     * and producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
      *
      * @param topic     the topic name
      */
     void to(String topic);
 
     /**
-     * Materialize this stream to a topic.
+     * Materialize this stream to a topic using default serializers specified in the config and a customizable
+     * {@link StreamPartitioner} to determine the distribution of records to partitions.
      *
-     * @param keySerde  key serde used to send key-value pairs,
-     *                  if not specified the default serde defined in the configs will be used
-     * @param valSerde  value serde used to send key-value pairs,
-     *                  if not specified the default serde defined in the configs will be used
-     * @param topic     the topic name
+     * @param partitioner  the function used to determine how records are distributed among partitions of the topic,
+     *                     if not specified producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
+     * @param topic        the topic name
+     */
+    void to(StreamPartitioner<K, V> partitioner, String topic);
+
+    /**
+     * Materialize this stream to a topic. If {@code keySerde} provides a
+     * {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key
+     * {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} is used
+     * &mdash; otherwise producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} is used.
+     *
+     * @param keySerde     key serde used to send key-value pairs,
+     *                     if not specified the default serde defined in the configs will be used
+     * @param valSerde     value serde used to send key-value pairs,
+     *                     if not specified the default serde defined in the configs will be used
+     * @param topic        the topic name
      */
     void to(Serde<K> keySerde, Serde<V> valSerde, String topic);
 
     /**
+     * Materialize this stream to a topic using a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
+     *
+     * @param keySerde     key serde used to send key-value pairs,
+     *                     if not specified the default serde defined in the configs will be used
+     * @param valSerde     value serde used to send key-value pairs,
+     *                     if not specified the default serde defined in the configs will be used
+     * @param partitioner  the function used to determine how records are distributed among partitions of the topic,
+     *                     if not specified and {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key
+     *                     {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used
+     *                     &mdash; otherwise {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
+     * @param topic        the topic name
+     */
+    void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic);
+
+    /**
      * Create a new {@link KStream} instance by applying a {@link org.apache.kafka.streams.kstream.Transformer} to all elements in this stream, one element at a time.
      *
      * @param transformerSupplier   the instance of {@link TransformerSupplier} that generates {@link org.apache.kafka.streams.kstream.Transformer}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c5fe7bd/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 30ea882..581ee28 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.kstream;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 
 /**
  * KTable is an abstraction of a <i>changelog stream</i> from a primary-keyed table.
@@ -54,7 +55,7 @@ public interface KTable<K, V> {
 
     /**
      * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic
-     * using default serializers and deserializers.
+     * using default serializers and deserializers and producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
      * This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(String)}.
      *
      * @param topic         the topic name
@@ -62,37 +63,98 @@ public interface KTable<K, V> {
     KTable<K, V> through(String topic);
 
     /**
+     * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic using default serializers
+     * and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
+     * This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(String)}.
+     *
+     * @param partitioner  the function used to determine how records are distributed among partitions of the topic,
+     *                     if not specified producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
+     * @param topic        the topic name
+     */
+    KTable<K, V> through(StreamPartitioner<K, V> partitioner, String topic);
+
+    /**
      * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic.
+     * If {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer}
+     * for the key {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} is used
+     * &mdash; otherwise producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} is used.
      * This is equivalent to calling {@link #to(Serde, Serde, String)} and
      * {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(Serde, Serde, String)}.
      *
-     * @param keySerde  key serde used to send key-value pairs,
-     *                  if not specified the default key serde defined in the configuration will be used
-     * @param valSerde  value serde used to send key-value pairs,
-     *                  if not specified the default value serde defined in the configuration will be used
-     * @param topic     the topic name
+     * @param keySerde     key serde used to send key-value pairs,
+     *                     if not specified the default key serde defined in the configuration will be used
+     * @param valSerde     value serde used to send key-value pairs,
+     *                     if not specified the default value serde defined in the configuration will be used
+     * @param topic        the topic name
      */
     KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic);
 
     /**
-     * Materialize this stream to a topic using default serializers specified in the config.
+     * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic
+     * using a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
+     * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)} and
+     * {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(Serde, Serde, String)}.
+     *
+     * @param keySerde     key serde used to send key-value pairs,
+     *                     if not specified the default key serde defined in the configuration will be used
+     * @param valSerde     value serde used to send key-value pairs,
+     *                     if not specified the default value serde defined in the configuration will be used
+     * @param partitioner  the function used to determine how records are distributed among partitions of the topic,
+     *                     if not specified and {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key
+     *                     {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used
+     *                     &mdash; otherwise {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
+     * @param topic        the topic name
+     */
+    KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic);
+
+    /**
+     * Materialize this stream to a topic using default serializers specified in the config
+     * and producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
      *
      * @param topic         the topic name
      */
     void to(String topic);
 
     /**
-     * Materialize this stream to a topic.
+     * Materialize this stream to a topic using default serializers specified in the config
+     * and a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
+     *
+     * @param partitioner  the function used to determine how records are distributed among partitions of the topic,
+     *                     if not specified producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
+     * @param topic        the topic name
+     */
+    void to(StreamPartitioner<K, V> partitioner, String topic);
+
+    /**
+     * Materialize this stream to a topic. If {@code keySerde} provides a
+     * {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key
+     * {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} is used
+     * &mdash; otherwise producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} is used.
      *
-     * @param keySerde  key serde used to send key-value pairs,
-     *                  if not specified the default serde defined in the configs will be used
-     * @param valSerde  value serde used to send key-value pairs,
-     *                  if not specified the default serde defined in the configs will be used
-     * @param topic     the topic name
+     * @param keySerde     key serde used to send key-value pairs,
+     *                     if not specified the default serde defined in the configs will be used
+     * @param valSerde     value serde used to send key-value pairs,
+     *                     if not specified the default serde defined in the configs will be used
+     * @param topic        the topic name
      */
     void to(Serde<K> keySerde, Serde<V> valSerde, String topic);
 
     /**
+     * Materialize this stream to a topic using a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
+     *
+     * @param keySerde     key serde used to send key-value pairs,
+     *                     if not specified the default serde defined in the configs will be used
+     * @param valSerde     value serde used to send key-value pairs,
+     *                     if not specified the default serde defined in the configs will be used
+     * @param partitioner  the function used to determine how records are distributed among partitions of the topic,
+     *                     if not specified and {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key
+     *                     {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used
+     *                     &mdash; otherwise {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used
+     * @param topic        the topic name
+     */
+    void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic);
+
+    /**
      * Convert this stream to a new instance of {@link KStream}.
      */
     KStream<K, V> toStream();

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c5fe7bd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 5889e07..0fb3984 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -194,37 +194,56 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     }
 
     @Override
-    public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
-        to(keySerde, valSerde, topic);
+    public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) {
+        to(keySerde, valSerde, partitioner, topic);
 
         return topology.stream(keySerde, valSerde, topic);
     }
 
     @Override
+    public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
+        return through(keySerde, valSerde, null, topic);
+    }
+
+    @Override
+    public KStream<K, V> through(StreamPartitioner<K, V> partitioner, String topic) {
+        return through(null, null, partitioner, topic);
+    }
+
+    @Override
     public KStream<K, V> through(String topic) {
-        return through(null, null, topic);
+        return through(null, null, null, topic);
     }
 
     @Override
     public void to(String topic) {
-        to(null, null, topic);
+        to(null, null, null, topic);
+    }
+
+    @Override
+    public void to(StreamPartitioner<K, V> partitioner, String topic) {
+        to(null, null, partitioner, topic);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void to(Serde<K> keySerde, Serde<V> valSerde, String topic) {
+        to(keySerde, valSerde, null, topic);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) {
         String name = topology.newName(SINK_NAME);
-        StreamPartitioner<K, V> streamPartitioner = null;
 
         Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
         Serializer<V> valSerializer = keySerde == null ? null : valSerde.serializer();
-
-        if (keySerializer != null && keySerializer instanceof WindowedSerializer) {
+        
+        if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) {
             WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
-            streamPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer);
+            partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer);
         }
 
-        topology.addSink(name, topic, keySerializer, valSerializer, streamPartitioner, this.name);
+        topology.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c5fe7bd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index fd464a0..156f2db 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.state.Stores;
 
 import java.util.Collections;
@@ -133,25 +134,46 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     @Override
     public KTable<K, V> through(Serde<K> keySerde,
                                 Serde<V> valSerde,
+                                StreamPartitioner<K, V> partitioner,
                                 String topic) {
-        to(keySerde, valSerde, topic);
+        to(keySerde, valSerde, partitioner, topic);
 
         return topology.table(keySerde, valSerde, topic);
     }
 
     @Override
+    public KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
+        return through(keySerde, valSerde, null, topic);
+    }
+
+    @Override
+    public KTable<K, V> through(StreamPartitioner<K, V> partitioner, String topic) {
+        return through(null, null, partitioner, topic);
+    }
+
+    @Override
     public KTable<K, V> through(String topic) {
-        return through(null, null, topic);
+        return through(null, null, null, topic);
     }
 
     @Override
     public void to(String topic) {
-        to(null, null, topic);
+        to(null, null, null, topic);
+    }
+
+    @Override
+    public void to(StreamPartitioner<K, V> partitioner, String topic) {
+        to(null, null, partitioner, topic);
     }
 
     @Override
     public void to(Serde<K> keySerde, Serde<V> valSerde, String topic) {
-        this.toStream().to(keySerde, valSerde, topic);
+        this.toStream().to(keySerde, valSerde, null, topic);
+    }
+
+    @Override
+    public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) {
+        this.toStream().to(keySerde, valSerde, partitioner, topic);
     }
 
     @Override


[05/21] kafka git commit: Changing version to 0.10.1.0-SNAPSHOT

Posted by gw...@apache.org.
Changing version to 0.10.1.0-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bc47e230
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bc47e230
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bc47e230

Branch: refs/heads/0.10.0
Commit: bc47e2306d9558a0f0976b15a554fc2d84ade5fc
Parents: e0ac36f
Author: Gwen Shapira <cs...@gmail.com>
Authored: Mon Mar 21 09:53:17 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700

----------------------------------------------------------------------
 gradle.properties | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bc47e230/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
index b058e58..0a612f6 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -16,7 +16,7 @@
 group=org.apache.kafka
 # NOTE: When you change this version number, you should also make sure to update
 # the version numbers in tests/kafkatest/__init__.py and kafka-merge-pr.py.
-version=0.10.0.0-SNAPSHOT
+version=0.10.1.0-SNAPSHOT
 scalaVersion=2.10.6
 task=build
 org.gradle.jvmargs=-XX:MaxPermSize=512m -Xmx1024m -Xss2m


[20/21] kafka git commit: MINOR: update new version in additional places

Posted by gw...@apache.org.
MINOR: update new version in additional places

matching set of version fixes. ewencp junrao

Author: Gwen Shapira <cs...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Geoff Anderson <ge...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1110 from gwenshap/minor-fix-version-010


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aa6f0d8d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aa6f0d8d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aa6f0d8d

Branch: refs/heads/0.10.0
Commit: aa6f0d8d5f084126bbba1fb6b40e392f2b908262
Parents: 050bf60
Author: Gwen Shapira <cs...@gmail.com>
Authored: Mon Mar 21 18:58:45 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700

----------------------------------------------------------------------
 kafka-merge-pr.py           | 2 +-
 tests/kafkatest/__init__.py | 3 ++-
 2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/aa6f0d8d/kafka-merge-pr.py
----------------------------------------------------------------------
diff --git a/kafka-merge-pr.py b/kafka-merge-pr.py
index f26a0a9..2345dbb 100644
--- a/kafka-merge-pr.py
+++ b/kafka-merge-pr.py
@@ -72,7 +72,7 @@ RELEASE_BRANCH_PREFIX = "0."
 
 DEV_BRANCH_NAME = "trunk"
 
-DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "0.10.1.0")
+DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "0.10.0.1")
 
 def get_json(url):
     try:

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa6f0d8d/tests/kafkatest/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py
index 10163a0..e1c87b7 100644
--- a/tests/kafkatest/__init__.py
+++ b/tests/kafkatest/__init__.py
@@ -23,4 +23,5 @@
 # Instead, in trunk, the version should have a suffix of the form ".devN"
 #
 # For example, when Kafka is at version 0.9.0.0-SNAPSHOT, this should be something like "0.9.0.0.dev0"
-__version__ = '0.10.1.0.dev0'
+__version__ = '0.10.0.0'
+


[12/21] kafka git commit: KAFKA-3419: clarify difference between topic subscription and partition assignment

Posted by gw...@apache.org.
KAFKA-3419: clarify difference between topic subscription and partition assignment

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ashish Singh, Ismael Juma, Guozhang Wang

Closes #1158 from hachikuji/KAFKA-3419


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eb08e493
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eb08e493
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eb08e493

Branch: refs/heads/0.10.0
Commit: eb08e493228e2e34eae361922796dcffb920e78d
Parents: 09f4a7f
Author: Jason Gustafson <ja...@confluent.io>
Authored: Sun Apr 3 13:44:05 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   | 41 ++++++++++----------
 1 file changed, 21 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/eb08e493/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index b15d07f..c457c83 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -125,8 +125,9 @@ import java.util.regex.Pattern;
  * commits (note that offsets are always committed for a given consumer group), etc.
  * See <a href="#rebalancecallback">Storing Offsets Outside Kafka</a> for more details
  * <p>
- * It is also possible for the consumer to manually specify the partitions that are assigned to it through {@link #assign(Collection)},
- * which disables this dynamic partition assignment.
+ * It is also possible for the consumer to <a href="#manualassignment">manually assign</a> specific partitions
+ * (similar to the older "simple" consumer) using {@link #assign(Collection)}. In this case, dynamic partition
+ * assignment and consumer group coordination will be disabled.
  *
  * <h3>Usage Examples</h3>
  * The consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to
@@ -242,27 +243,23 @@ import java.util.regex.Pattern;
  * <b>Note: The committed offset should always be the offset of the next message that your application will read.</b>
  * Thus, when calling {@link #commitSync(Map) commitSync(offsets)} you should add one to the offset of the last message processed.
  *
- * <h4>Subscribing To Specific Partitions</h4>
+ * <h4><a name="manualassignment">Manual Partition Assignment</a></h4>
  *
- * In the previous examples we subscribed to the topics we were interested in and let Kafka give our particular process
- * a fair share of the partitions for those topics. This provides a simple load balancing mechanism so multiple
- * instances of our program can divided up the work of processing records.
+ * In the previous examples, we subscribed to the topics we were interested in and let Kafka dynamically assign a
+ * fair share of the partitions for those topics based on the active consumers in the group. However, in
+ * some cases you may need finer control over the specific partitions that are assigned. For example:
  * <p>
- * In this mode the consumer will just get the partitions it subscribes to and if the consumer instance fails no attempt
- * will be made to rebalance partitions to other instances.
- * <p>
- * There are several cases where this makes sense:
  * <ul>
- * <li>The first case is if the process is maintaining some kind of local state associated with that partition (like a
- * local on-disk key-value store) and hence it should only get records for the partition it is maintaining on disk.
- * <li>Another case is if the process itself is highly available and will be restarted if it fails (perhaps using a
+ * <li>If the process is maintaining some kind of local state associated with that partition (like a
+ * local on-disk key-value store), then it should only get records for the partition it is maintaining on disk.
+ * <li>If the process itself is highly available and will be restarted if it fails (perhaps using a
  * cluster management framework like YARN, Mesos, or AWS facilities, or as part of a stream processing framework). In
- * this case there is no need for Kafka to detect the failure and reassign the partition, rather the consuming process
+ * this case there is no need for Kafka to detect the failure and reassign the partition since the consuming process
  * will be restarted on another machine.
  * </ul>
  * <p>
- * This mode is easy to specify, rather than subscribing to the topic, the consumer just subscribes to particular
- * partitions:
+ * To use this mode, instead of subscribing to the topic using {@link #subscribe(Collection) subscribe}, you just call
+ * {@link #assign(Collection)} with the full list of partitions that you want to consume.
  *
  * <pre>
  *     String topic = &quot;foo&quot;;
@@ -271,11 +268,15 @@ import java.util.regex.Pattern;
  *     consumer.assign(Arrays.asList(partition0, partition1));
  * </pre>
  *
- * The group that the consumer specifies is still used for committing offsets, but now the set of partitions will only
- * be changed if the consumer specifies new partitions, and no attempt at failure detection will be made.
+ * Once assigned, you can call {@link #poll(long) poll} in a loop, just as in the preceding examples to consume
+ * records. The group that the consumer specifies is still used for committing offsets, but now the set of partitions
+ * will only change with another call to {@link #assign(Collection) assign}. Manual partition assignment does
+ * not use group coordination, so consumer failures will not cause assigned partitions to be rebalanced. Each consumer
+ * acts independently even if it shares a groupId with another consumer. To avoid offset commit conflicts, you should
+ * usually ensure that the groupId is unique for each consumer instance.
  * <p>
- * It isn't possible to mix both subscription to specific partitions (with no load balancing) and to topics (with load
- * balancing) using the same consumer instance.
+ * Note that it isn't possible to mix manual partition assignment (i.e. using {@link #assign(Collection) assign})
+ * with dynamic partition assignment through topic subscription (i.e. using {@link #subscribe(Collection) subscribe}).
  *
  * <h4><a name="rebalancecallback">Storing Offsets Outside Kafka</h4>
  *


[10/21] kafka git commit: MINOR: small code optimizations in streams

Posted by gw...@apache.org.
MINOR: small code optimizations in streams

guozhangwang

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #1176 from ymatsuda/optimize


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/09f4a7fd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/09f4a7fd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/09f4a7fd

Branch: refs/heads/0.10.0
Commit: 09f4a7fdc923b03a2f2ea29ecb0659ca450e8149
Parents: c216f8a
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Fri Apr 1 17:14:29 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700

----------------------------------------------------------------------
 .../processor/internals/PartitionGroup.java     |  6 +++-
 .../streams/processor/internals/StreamTask.java | 12 +++----
 .../processor/internals/StreamThread.java       | 35 ++++++++------------
 3 files changed, 24 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/09f4a7fd/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index b487ff5..3d8f792 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -49,6 +49,10 @@ public class PartitionGroup {
         public TopicPartition partition() {
             return queue.partition();
         }
+
+        public RecordQueue queue() {
+            return queue;
+        }
     }
 
     // since task is thread-safe, we do not need to synchronize on local variables
@@ -88,7 +92,7 @@ public class PartitionGroup {
             // get the first record from this queue.
             record = queue.poll();
 
-            if (queue.size() > 0) {
+            if (!queue.isEmpty()) {
                 queuesByTime.offer(queue);
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/09f4a7fd/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index afa303c..61aeced 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -179,7 +179,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
 
                 // after processing this record, if its partition queue's buffered size has been
                 // decreased to the threshold, we can then resume the consumption on this partition
-                if (partitionGroup.numBuffered(partition) == this.maxBufferedSize) {
+                if (recordInfo.queue().size() == this.maxBufferedSize) {
                     consumer.resume(singleton(partition));
                     requiresPoll = true;
                 }
@@ -320,13 +320,13 @@ public class StreamTask extends AbstractTask implements Punctuator {
     @SuppressWarnings("unchecked")
     public <K, V> void forward(K key, V value) {
         ProcessorNode thisNode = currNode;
-        for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
-            currNode = childNode;
-            try {
+        try {
+            for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
+                currNode = childNode;
                 childNode.process(key, value);
-            } finally {
-                currNode = thisNode;
             }
+        } finally {
+            currNode = thisNode;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/09f4a7fd/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 7d6b98f..c2a8e06 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -350,9 +350,12 @@ public class StreamThread extends Thread {
                     requiresPoll = requiresPoll || task.requiresPoll();
 
                     sensors.processTimeSensor.record(time.milliseconds() - startProcess);
-                }
 
-                maybePunctuate();
+                    maybePunctuate(task);
+
+                    if (task.commitNeeded())
+                        commitOne(task, time.milliseconds());
+                }
 
                 // if pollTimeMs has passed since the last poll, we poll to respond to a possible rebalance
                 // even when we paused all partitions.
@@ -424,18 +427,16 @@ public class StreamThread extends Thread {
         return true;
     }
 
-    private void maybePunctuate() {
-        for (StreamTask task : activeTasks.values()) {
-            try {
-                long now = time.milliseconds();
+    private void maybePunctuate(StreamTask task) {
+        try {
+            long now = time.milliseconds();
 
-                if (task.maybePunctuate(now))
-                    sensors.punctuateTimeSensor.record(time.milliseconds() - now);
+            if (task.maybePunctuate(now))
+                sensors.punctuateTimeSensor.record(time.milliseconds() - now);
 
-            } catch (KafkaException e) {
-                log.error("Failed to punctuate active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
-                throw e;
-            }
+        } catch (KafkaException e) {
+            log.error("Failed to punctuate active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
+            throw e;
         }
     }
 
@@ -449,16 +450,6 @@ public class StreamThread extends Thread {
             lastCommit = now;
 
             processStandbyRecords = true;
-        } else {
-            for (StreamTask task : activeTasks.values()) {
-                try {
-                    if (task.commitNeeded())
-                        commitOne(task, time.milliseconds());
-                } catch (KafkaException e) {
-                    log.error("Failed to commit active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
-                    throw e;
-                }
-            }
         }
     }
 


[18/21] kafka git commit: KAFKA-3384: Conform to POSIX kill usage

Posted by gw...@apache.org.
KAFKA-3384: Conform to POSIX kill usage

I believe this addresses KAFKA-3384.

The POSIX kill manpage is at http://pubs.opengroup.org/onlinepubs/9699919799/utilities/kill.html

Author: Matt McClure <ml...@aya.yale.edu>

Reviewers: Geoff Anderson <ge...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1148 from matthewlmcclure/KAFKA-3384


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d1a5883c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d1a5883c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d1a5883c

Branch: refs/heads/0.10.0
Commit: d1a5883c8ad69fbd16c3dc03ff05db887580ded5
Parents: c36268f
Author: Matt McClure <ml...@aya.yale.edu>
Authored: Mon Apr 4 22:07:20 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700

----------------------------------------------------------------------
 bin/kafka-server-stop.sh     | 2 +-
 bin/zookeeper-server-stop.sh | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d1a5883c/bin/kafka-server-stop.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-server-stop.sh b/bin/kafka-server-stop.sh
index f75ded7..d3c660c 100755
--- a/bin/kafka-server-stop.sh
+++ b/bin/kafka-server-stop.sh
@@ -19,6 +19,6 @@ if [ -z "$PIDS" ]; then
   echo "No kafka server to stop"
   exit 1
 else 
-  kill -SIGTERM $PIDS
+  kill -s TERM $PIDS
 fi
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1a5883c/bin/zookeeper-server-stop.sh
----------------------------------------------------------------------
diff --git a/bin/zookeeper-server-stop.sh b/bin/zookeeper-server-stop.sh
index 07c7910..f771064 100755
--- a/bin/zookeeper-server-stop.sh
+++ b/bin/zookeeper-server-stop.sh
@@ -19,6 +19,6 @@ if [ -z "$PIDS" ]; then
   echo "No zookeeper server to stop"
   exit 1
 else
-  kill -SIGTERM $PIDS
+  kill -s TERM $PIDS
 fi
 


[15/21] kafka git commit: KAFKA-3464: Add system tests for Connect with Kafka security enabled

Posted by gw...@apache.org.
KAFKA-3464: Add system tests for Connect with Kafka security enabled

Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: Ismael Juma, Gwen Shapira

Closes #1141 from ewencp/kafka-3464-connect-security-system-tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/98978139
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/98978139
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/98978139

Branch: refs/heads/0.10.0
Commit: 9897813957c1b5cd70a0dd02094f184f73e06979
Parents: 80ba01e
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Mon Apr 4 18:49:29 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700

----------------------------------------------------------------------
 tests/kafkatest/services/connect.py             |  6 ++++
 tests/kafkatest/services/mirror_maker.py        |  1 -
 .../services/security/security_config.py        | 24 ++++++++-----
 .../tests/connect/connect_distributed_test.py   | 36 +++++++++++++++----
 tests/kafkatest/tests/connect/connect_test.py   | 37 +++++++++++++++-----
 .../templates/connect-distributed.properties    |  7 ++--
 .../templates/connect-standalone.properties     |  4 ++-
 7 files changed, 86 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/98978139/tests/kafkatest/services/connect.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index 76336e1..51dade3 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -48,6 +48,7 @@ class ConnectServiceBase(Service):
     def __init__(self, context, num_nodes, kafka, files):
         super(ConnectServiceBase, self).__init__(context, num_nodes)
         self.kafka = kafka
+        self.security_config = kafka.security_config.client_config()
         self.files = files
 
     def pids(self, node):
@@ -89,6 +90,7 @@ class ConnectServiceBase(Service):
 
     def clean_node(self, node):
         node.account.kill_process("connect", clean_shutdown=False, allow_fail=True)
+        self.security_config.clean_node(node)
         node.account.ssh("rm -rf " + " ".join([self.CONFIG_FILE, self.LOG4J_CONFIG_FILE, self.PID_FILE, self.LOG_FILE, self.STDOUT_FILE, self.STDERR_FILE] + self.config_filenames() + self.files), allow_fail=False)
 
     def config_filenames(self):
@@ -153,6 +155,7 @@ class ConnectStandaloneService(ConnectServiceBase):
 
     def start_cmd(self, node, connector_configs):
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
+        cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
         cmd += "/opt/%s/bin/connect-standalone.sh %s " % (kafka_dir(node), self.CONFIG_FILE)
         cmd += " ".join(connector_configs)
         cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
@@ -161,6 +164,7 @@ class ConnectStandaloneService(ConnectServiceBase):
     def start_node(self, node):
         node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
 
+        self.security_config.setup_node(node)
         node.account.create_file(self.CONFIG_FILE, self.config_template_func(node))
         node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('connect_log4j.properties', log_file=self.LOG_FILE))
         remote_connector_configs = []
@@ -190,6 +194,7 @@ class ConnectDistributedService(ConnectServiceBase):
 
     def start_cmd(self, node):
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
+        cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
         cmd += "/opt/%s/bin/connect-distributed.sh %s " % (kafka_dir(node), self.CONFIG_FILE)
         cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
         return cmd
@@ -197,6 +202,7 @@ class ConnectDistributedService(ConnectServiceBase):
     def start_node(self, node):
         node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
 
+        self.security_config.setup_node(node)
         node.account.create_file(self.CONFIG_FILE, self.config_template_func(node))
         node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('connect_log4j.properties', log_file=self.LOG_FILE))
         if self.connector_config_templates:

http://git-wip-us.apache.org/repos/asf/kafka/blob/98978139/tests/kafkatest/services/mirror_maker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py
index 4386788..cb4b2c1 100644
--- a/tests/kafkatest/services/mirror_maker.py
+++ b/tests/kafkatest/services/mirror_maker.py
@@ -1,4 +1,3 @@
-
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.

http://git-wip-us.apache.org/repos/asf/kafka/blob/98978139/tests/kafkatest/services/security/security_config.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py
index b5efba8..1bbabd2 100644
--- a/tests/kafkatest/services/security/security_config.py
+++ b/tests/kafkatest/services/security/security_config.py
@@ -17,6 +17,7 @@ import os
 import subprocess
 from ducktape.template import TemplateRenderer
 from kafkatest.services.security.minikdc import MiniKdc
+import itertools
 
 class Keytool(object):
 
@@ -172,17 +173,22 @@ class SecurityConfig(TemplateRenderer):
         else:
             return ""
 
-    def __str__(self):
+    def props(self, prefix=''):
         """
-        Return properties as string with line separators.
+        Return properties as string with line separators, optionally with a prefix.
         This is used to append security config properties to
         a properties file.
+        :param prefix: prefix to add to each property
+        :return: a string containing line-separated properties
         """
+        if self.security_protocol == SecurityConfig.PLAINTEXT:
+            return ""
+        config_lines = (prefix + key + "=" + value for key, value in self.properties.iteritems())
+        # Extra blank lines ensure this can be appended/prepended safely
+        return "\n".join(itertools.chain([""], config_lines, [""]))
 
-        prop_str = ""
-        if self.security_protocol != SecurityConfig.PLAINTEXT:
-            for key, value in self.properties.items():
-                prop_str += ("\n" + key + "=" + value)
-            prop_str += "\n"
-        return prop_str
-
+    def __str__(self):
+        """
+        Return properties as a string with line separators.
+        """
+        return self.props()

http://git-wip-us.apache.org/repos/asf/kafka/blob/98978139/tests/kafkatest/tests/connect/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
index 9aa16ab..698a827 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -13,15 +13,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from kafkatest.tests.kafka_test import KafkaTest
+from ducktape.tests.test import Test
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
 from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink
 from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.security.security_config import SecurityConfig
 from ducktape.utils.util import wait_until
 from ducktape.mark import matrix
 import subprocess, itertools, time
 from collections import Counter
 
-class ConnectDistributedTest(KafkaTest):
+class ConnectDistributedTest(Test):
     """
     Simple test of Kafka Connect in distributed mode, producing data from files on one cluster and consuming it on
     another, validating the total output is identical to the input.
@@ -45,22 +49,39 @@ class ConnectDistributedTest(KafkaTest):
     SCHEMA = { "type": "string", "optional": False }
 
     def __init__(self, test_context):
-        super(ConnectDistributedTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+        super(ConnectDistributedTest, self).__init__(test_context)
+        self.num_zk = 1
+        self.num_brokers = 1
+        self.topics = {
             'test' : { 'partitions': 1, 'replication-factor': 1 }
-        })
+        }
+
+        self.zk = ZookeeperService(test_context, self.num_zk)
 
-        self.cc = ConnectDistributedService(test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE])
-        self.cc.log_level = "DEBUG"
         self.key_converter = "org.apache.kafka.connect.json.JsonConverter"
         self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
         self.schemas = True
 
-    def test_file_source_and_sink(self):
+    def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT):
+        self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,
+                                  security_protocol=security_protocol, interbroker_security_protocol=security_protocol,
+                                  topics=self.topics)
+
+        self.cc = ConnectDistributedService(self.test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE])
+        self.cc.log_level = "DEBUG"
+
+        self.zk.start()
+        self.kafka.start()
+
+
+    @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL])
+    def test_file_source_and_sink(self, security_protocol):
         """
         Tests that a basic file connector works across clean rolling bounces. This validates that the connector is
         correctly created, tasks instantiated, and as nodes restart the work is rebalanced across nodes.
         """
 
+        self.setup_services(security_protocol=security_protocol)
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
 
         self.cc.start()
@@ -94,6 +115,7 @@ class ConnectDistributedTest(KafkaTest):
         """
         num_tasks = 3
 
+        self.setup_services()
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
         self.cc.start()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/98978139/tests/kafkatest/tests/connect/connect_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py
index 90f219a..7b57402 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -13,14 +13,18 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from kafkatest.tests.kafka_test import KafkaTest
+from ducktape.tests.test import Test
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
 from kafkatest.services.connect import ConnectStandaloneService
 from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.security.security_config import SecurityConfig
 from ducktape.utils.util import wait_until
-from ducktape.mark import parametrize
+from ducktape.mark import parametrize, matrix
 import hashlib, subprocess, json
 
-class ConnectStandaloneFileTest(KafkaTest):
+class ConnectStandaloneFileTest(Test):
     """
     Simple test of Kafka Connect that produces data from a file in one
     standalone process and consumes it on another, validating the output is
@@ -42,24 +46,39 @@ class ConnectStandaloneFileTest(KafkaTest):
     SCHEMA = { "type": "string", "optional": False }
 
     def __init__(self, test_context):
-        super(ConnectStandaloneFileTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+        super(ConnectStandaloneFileTest, self).__init__(test_context)
+        self.num_zk = 1
+        self.num_brokers = 1
+        self.topics = {
             'test' : { 'partitions': 1, 'replication-factor': 1 }
-        })
+        }
 
-        self.source = ConnectStandaloneService(test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
-        self.sink = ConnectStandaloneService(test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
-        self.consumer_validator = ConsoleConsumer(test_context, 1, self.kafka, self.TOPIC, consumer_timeout_ms=1000)
+        self.zk = ZookeeperService(test_context, self.num_zk)
 
     @parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=True)
     @parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=False)
     @parametrize(converter="org.apache.kafka.connect.storage.StringConverter", schemas=None)
-    def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True):
+    @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL])
+    def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True, security_protocol='PLAINTEXT'):
         assert converter != None, "converter type must be set"
         # Template parameters
         self.key_converter = converter
         self.value_converter = converter
         self.schemas = schemas
 
+        self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,
+                                  security_protocol=security_protocol, interbroker_security_protocol=security_protocol,
+                                  topics=self.topics)
+
+        self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
+        self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
+        self.consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.TOPIC,
+                                                  consumer_timeout_ms=1000, new_consumer=True)
+
+
+        self.zk.start()
+        self.kafka.start()
+
         self.source.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-source.properties")])
         self.sink.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-sink.properties")])
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/98978139/tests/kafkatest/tests/connect/templates/connect-distributed.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/templates/connect-distributed.properties b/tests/kafkatest/tests/connect/templates/connect-distributed.properties
index 7a7440a..48f5f78 100644
--- a/tests/kafkatest/tests/connect/templates/connect-distributed.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-distributed.properties
@@ -13,7 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-bootstrap.servers={{ kafka.bootstrap_servers() }}
+bootstrap.servers={{ kafka.bootstrap_servers(kafka.security_config.security_protocol) }}
+{{ kafka.security_config.client_config().props() }}
+{{ kafka.security_config.client_config().props("producer.") }}
+{{ kafka.security_config.client_config().props("consumer.") }}
 
 group.id={{ group|default("connect-cluster") }}
 
@@ -43,4 +46,4 @@ rest.advertised.host.name = {{ node.account.hostname }}
 
 # Reduce session timeouts so tests that kill workers don't need to wait as long to recover
 session.timeout.ms=10000
-consumer.session.timeout.ms=10000
\ No newline at end of file
+consumer.session.timeout.ms=10000

http://git-wip-us.apache.org/repos/asf/kafka/blob/98978139/tests/kafkatest/tests/connect/templates/connect-standalone.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/templates/connect-standalone.properties b/tests/kafkatest/tests/connect/templates/connect-standalone.properties
index bf1daf7..09c6487 100644
--- a/tests/kafkatest/tests/connect/templates/connect-standalone.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-standalone.properties
@@ -13,7 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-bootstrap.servers={{ kafka.bootstrap_servers() }}
+bootstrap.servers={{ kafka.bootstrap_servers(kafka.security_config.security_protocol) }}
+{{ kafka.security_config.client_config().props("producer.") }}
+{{ kafka.security_config.client_config().props("consumer.") }}
 
 key.converter={{ key_converter|default("org.apache.kafka.connect.json.JsonConverter") }}
 value.converter={{ value_converter|default("org.apache.kafka.connect.json.JsonConverter") }}


[17/21] kafka git commit: KAFKA-3489; Update request metrics if a client closes a connection while the broker response is in flight

Posted by gw...@apache.org.
KAFKA-3489; Update request metrics if a client closes a connection while the broker response is in flight

I also fixed a few issues in `SocketServerTest` and included a few clean-ups.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #1172 from ijuma/kafka-3489-update-request-metrics-if-client-closes


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e733d8c2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e733d8c2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e733d8c2

Branch: refs/heads/0.10.0
Commit: e733d8c2fbcee19ee77c436e66abb29850a2f7c2
Parents: 7030148
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue Apr 5 18:16:48 2016 -0400
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/common/network/Selector.java   |   6 +-
 .../scala/kafka/network/RequestChannel.scala    |  53 +++---
 .../main/scala/kafka/network/SocketServer.scala | 185 +++++++++++--------
 .../unit/kafka/network/SocketServerTest.scala   | 141 +++++++++++---
 4 files changed, 257 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e733d8c2/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 698b99c..c333741 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -491,7 +491,7 @@ public class Selector implements Selectable {
     private KafkaChannel channelOrFail(String id) {
         KafkaChannel channel = this.channels.get(id);
         if (channel == null)
-            throw new IllegalStateException("Attempt to retrieve channel for which there is no open connection. Connection id " + id + " existing connections " + channels.keySet().toString());
+            throw new IllegalStateException("Attempt to retrieve channel for which there is no open connection. Connection id " + id + " existing connections " + channels.keySet());
         return channel;
     }
 
@@ -551,7 +551,7 @@ public class Selector implements Selectable {
      * checks if there are any staged receives and adds to completedReceives
      */
     private void addToCompletedReceives() {
-        if (this.stagedReceives.size() > 0) {
+        if (!this.stagedReceives.isEmpty()) {
             Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
             while (iter.hasNext()) {
                 Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
@@ -561,7 +561,7 @@ public class Selector implements Selectable {
                     NetworkReceive networkReceive = deque.poll();
                     this.completedReceives.add(networkReceive);
                     this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
-                    if (deque.size() == 0)
+                    if (deque.isEmpty())
                         iter.remove();
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e733d8c2/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 1105802..17c5b9b 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -117,36 +117,39 @@ object RequestChannel extends Logging {
       if (apiRemoteCompleteTimeMs < 0)
         apiRemoteCompleteTimeMs = responseCompleteTimeMs
 
-      val requestQueueTime = (requestDequeueTimeMs - startTimeMs).max(0L)
-      val apiLocalTime = (apiLocalCompleteTimeMs - requestDequeueTimeMs).max(0L)
-      val apiRemoteTime = (apiRemoteCompleteTimeMs - apiLocalCompleteTimeMs).max(0L)
-      val apiThrottleTime = (responseCompleteTimeMs - apiRemoteCompleteTimeMs).max(0L)
-      val responseQueueTime = (responseDequeueTimeMs - responseCompleteTimeMs).max(0L)
-      val responseSendTime = (endTimeMs - responseDequeueTimeMs).max(0L)
+      val requestQueueTime = math.max(requestDequeueTimeMs - startTimeMs, 0)
+      val apiLocalTime = math.max(apiLocalCompleteTimeMs - requestDequeueTimeMs, 0)
+      val apiRemoteTime = math.max(apiRemoteCompleteTimeMs - apiLocalCompleteTimeMs, 0)
+      val apiThrottleTime = math.max(responseCompleteTimeMs - apiRemoteCompleteTimeMs, 0)
+      val responseQueueTime = math.max(responseDequeueTimeMs - responseCompleteTimeMs, 0)
+      val responseSendTime = math.max(endTimeMs - responseDequeueTimeMs, 0)
       val totalTime = endTimeMs - startTimeMs
-      var metricsList = List(RequestMetrics.metricsMap(ApiKeys.forId(requestId).name))
-      if (requestId == ApiKeys.FETCH.id) {
-        val isFromFollower = requestObj.asInstanceOf[FetchRequest].isFromFollower
-        metricsList ::= ( if (isFromFollower)
-                            RequestMetrics.metricsMap(RequestMetrics.followFetchMetricName)
-                          else
-                            RequestMetrics.metricsMap(RequestMetrics.consumerFetchMetricName) )
-      }
-      metricsList.foreach{
-        m => m.requestRate.mark()
-             m.requestQueueTimeHist.update(requestQueueTime)
-             m.localTimeHist.update(apiLocalTime)
-             m.remoteTimeHist.update(apiRemoteTime)
-             m.throttleTimeHist.update(apiThrottleTime)
-             m.responseQueueTimeHist.update(responseQueueTime)
-             m.responseSendTimeHist.update(responseSendTime)
-             m.totalTimeHist.update(totalTime)
+      val fetchMetricNames =
+        if (requestId == ApiKeys.FETCH.id) {
+          val isFromFollower = requestObj.asInstanceOf[FetchRequest].isFromFollower
+          Seq(
+            if (isFromFollower) RequestMetrics.followFetchMetricName
+            else RequestMetrics.consumerFetchMetricName
+          )
+        }
+        else Seq.empty
+      val metricNames = fetchMetricNames :+ ApiKeys.forId(requestId).name
+      metricNames.foreach { metricName =>
+        val m = RequestMetrics.metricsMap(metricName)
+        m.requestRate.mark()
+        m.requestQueueTimeHist.update(requestQueueTime)
+        m.localTimeHist.update(apiLocalTime)
+        m.remoteTimeHist.update(apiRemoteTime)
+        m.throttleTimeHist.update(apiThrottleTime)
+        m.responseQueueTimeHist.update(responseQueueTime)
+        m.responseSendTimeHist.update(responseSendTime)
+        m.totalTimeHist.update(totalTime)
       }
 
-      if(requestLogger.isTraceEnabled)
+      if (requestLogger.isTraceEnabled)
         requestLogger.trace("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s"
           .format(requestDesc(true), connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime, securityProtocol, session.principal))
-      else if(requestLogger.isDebugEnabled)
+      else if (requestLogger.isDebugEnabled)
         requestLogger.debug("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s"
           .format(requestDesc(false), connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime, securityProtocol, session.principal))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e733d8c2/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 5c31ac6..f1ec2ef 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -31,9 +31,8 @@ import kafka.common.KafkaException
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.KafkaConfig
 import kafka.utils._
-import org.apache.kafka.common.MetricName
 import org.apache.kafka.common.metrics._
-import org.apache.kafka.common.network.{Selector => KSelector, LoginType, Mode, ChannelBuilders}
+import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, LoginType, Mode, Selector => KSelector}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.protocol.types.SchemaException
@@ -41,7 +40,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
 
 import scala.collection._
 import JavaConverters._
-import scala.util.control.{NonFatal, ControlThrowable}
+import scala.util.control.{ControlThrowable, NonFatal}
 
 /**
  * An NIO socket server. The threading model is
@@ -83,8 +82,6 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
 
       val sendBufferSize = config.socketSendBufferBytes
       val recvBufferSize = config.socketReceiveBufferBytes
-      val maxRequestSize = config.socketRequestMaxBytes
-      val connectionsMaxIdleMs = config.connectionsMaxIdleMs
       val brokerId = config.brokerId
 
       var processorBeginIndex = 0
@@ -92,18 +89,8 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
         val protocol = endpoint.protocolType
         val processorEndIndex = processorBeginIndex + numProcessorThreads
 
-        for (i <- processorBeginIndex until processorEndIndex) {
-          processors(i) = new Processor(i,
-            time,
-            maxRequestSize,
-            requestChannel,
-            connectionQuotas,
-            connectionsMaxIdleMs,
-            protocol,
-            config.values,
-            metrics
-          )
-        }
+        for (i <- processorBeginIndex until processorEndIndex)
+          processors(i) = newProcessor(i, connectionQuotas, protocol)
 
         val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
           processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
@@ -148,10 +135,27 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
     }
   }
 
+  /* `protected` for test usage */
+  protected[network] def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, protocol: SecurityProtocol): Processor = {
+    new Processor(id,
+      time,
+      config.socketRequestMaxBytes,
+      requestChannel,
+      connectionQuotas,
+      config.connectionsMaxIdleMs,
+      protocol,
+      config.values,
+      metrics
+    )
+  }
+
   /* For test usage */
   private[network] def connectionCount(address: InetAddress): Int =
     Option(connectionQuotas).fold(0)(_.get(address))
 
+  /* For test usage */
+  private[network] def processor(index: Int): Processor = processors(index)
+
 }
 
 /**
@@ -376,10 +380,7 @@ private[kafka] class Processor(val id: Int,
 
   private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
   private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
-  private val channelBuilder = ChannelBuilders.create(protocol, Mode.SERVER, LoginType.SERVER, channelConfigs)
-  private val metricTags = new util.HashMap[String, String]()
-  metricTags.put("networkProcessor", id.toString)
-
+  private val metricTags = Map("networkProcessor" -> id.toString).asJava
 
   newGauge("IdlePercent",
     new Gauge[Double] {
@@ -398,65 +399,27 @@ private[kafka] class Processor(val id: Int,
     "socket-server",
     metricTags,
     false,
-    channelBuilder)
+    ChannelBuilders.create(protocol, Mode.SERVER, LoginType.SERVER, channelConfigs))
 
   override def run() {
     startupComplete()
-    while(isRunning) {
+    while (isRunning) {
       try {
         // setup any new connections that have been queued up
         configureNewConnections()
         // register any new responses for writing
         processNewResponses()
-
-        try {
-          selector.poll(300)
-        } catch {
-          case e @ (_: IllegalStateException | _: IOException) =>
-            error("Closing processor %s due to illegal state or IO exception".format(id))
-            swallow(closeAll())
-            shutdownComplete()
-            throw e
-        }
-        selector.completedReceives.asScala.foreach { receive =>
-          try {
-            val channel = selector.channel(receive.source)
-            val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
-              channel.socketAddress)
-            val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
-            requestChannel.sendRequest(req)
-            selector.mute(receive.source)
-          } catch {
-            case e @ (_: InvalidRequestException | _: SchemaException) =>
-              // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
-              error("Closing socket for " + receive.source + " because of error", e)
-              close(selector, receive.source)
-          }
-        }
-
-        selector.completedSends.asScala.foreach { send =>
-          val resp = inflightResponses.remove(send.destination).getOrElse {
-            throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
-          }
-          resp.request.updateRequestMetrics()
-          selector.unmute(send.destination)
-        }
-
-        selector.disconnected.asScala.foreach { connectionId =>
-          val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
-            throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
-          }.remoteHost
-          // the channel has been closed by the selector but the quotas still need to be updated
-          connectionQuotas.dec(InetAddress.getByName(remoteHost))
-        }
-
+        poll()
+        processCompletedReceives()
+        processCompletedSends()
+        processDisconnected()
       } catch {
         // We catch all the throwables here to prevent the processor thread from exiting. We do this because
-        // letting a processor exit might cause bigger impact on the broker. Usually the exceptions thrown would
+        // letting a processor exit might cause a bigger impact on the broker. Usually the exceptions thrown would
         // be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel
         // or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop.
-        case e : ControlThrowable => throw e
-        case e : Throwable =>
+        case e: ControlThrowable => throw e
+        case e: Throwable =>
           error("Processor got uncaught exception.", e)
       }
     }
@@ -468,7 +431,7 @@ private[kafka] class Processor(val id: Int,
 
   private def processNewResponses() {
     var curr = requestChannel.receiveResponse(id)
-    while(curr != null) {
+    while (curr != null) {
       try {
         curr.responseAction match {
           case RequestChannel.NoOpAction =>
@@ -478,9 +441,7 @@ private[kafka] class Processor(val id: Int,
             trace("Socket server received empty response to send, registering for read: " + curr)
             selector.unmute(curr.request.connectionId)
           case RequestChannel.SendAction =>
-            trace("Socket server received response to send, registering for write and sending data: " + curr)
-            selector.send(curr.responseSend)
-            inflightResponses += (curr.request.connectionId -> curr)
+            sendResponse(curr)
           case RequestChannel.CloseConnectionAction =>
             curr.request.updateRequestMetrics
             trace("Closing socket connection actively according to the response code.")
@@ -492,6 +453,71 @@ private[kafka] class Processor(val id: Int,
     }
   }
 
+  /* `protected` for test usage */
+  protected[network] def sendResponse(response: RequestChannel.Response) {
+    trace(s"Socket server received response to send, registering for write and sending data: $response")
+    val channel = selector.channel(response.responseSend.destination)
+    // `channel` can be null if the selector closed the connection because it was idle for too long
+    if (channel == null) {
+      warn(s"Attempting to send response via channel for which there is no open connection, connection id $id")
+      response.request.updateRequestMetrics()
+    }
+    else {
+      selector.send(response.responseSend)
+      inflightResponses += (response.request.connectionId -> response)
+    }
+  }
+
+  private def poll() {
+    try selector.poll(300)
+    catch {
+      case e @ (_: IllegalStateException | _: IOException) =>
+        error(s"Closing processor $id due to illegal state or IO exception")
+        swallow(closeAll())
+        shutdownComplete()
+        throw e
+    }
+  }
+
+  private def processCompletedReceives() {
+    selector.completedReceives.asScala.foreach { receive =>
+      try {
+        val channel = selector.channel(receive.source)
+        val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
+          channel.socketAddress)
+        val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
+        requestChannel.sendRequest(req)
+        selector.mute(receive.source)
+      } catch {
+        case e @ (_: InvalidRequestException | _: SchemaException) =>
+          // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
+          error(s"Closing socket for ${receive.source} because of error", e)
+          close(selector, receive.source)
+      }
+    }
+  }
+
+  private def processCompletedSends() {
+    selector.completedSends.asScala.foreach { send =>
+      val resp = inflightResponses.remove(send.destination).getOrElse {
+        throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
+      }
+      resp.request.updateRequestMetrics()
+      selector.unmute(send.destination)
+    }
+  }
+
+  private def processDisconnected() {
+    selector.disconnected.asScala.foreach { connectionId =>
+      val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
+        throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
+      }.remoteHost
+      inflightResponses.remove(connectionId).foreach(_.request.updateRequestMetrics())
+      // the channel has been closed by the selector but the quotas still need to be updated
+      connectionQuotas.dec(InetAddress.getByName(remoteHost))
+    }
+  }
+
   /**
    * Queue up a new connection for reading
    */
@@ -504,10 +530,10 @@ private[kafka] class Processor(val id: Int,
    * Register any new connections that have been queued up
    */
   private def configureNewConnections() {
-    while(!newConnections.isEmpty) {
+    while (!newConnections.isEmpty) {
       val channel = newConnections.poll()
       try {
-        debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress)
+        debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
         val localHost = channel.socket().getLocalAddress.getHostAddress
         val localPort = channel.socket().getLocalPort
         val remoteHost = channel.socket().getInetAddress.getHostAddress
@@ -515,12 +541,12 @@ private[kafka] class Processor(val id: Int,
         val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
         selector.register(connectionId, channel)
       } catch {
-        // We explicitly catch all non fatal exceptions and close the socket to avoid socket leak. The other
-        // throwables will be caught in processor and logged as uncaught exception.
+        // We explicitly catch all non fatal exceptions and close the socket to avoid a socket leak. The other
+        // throwables will be caught in processor and logged as uncaught exceptions.
         case NonFatal(e) =>
-          // need to close the channel here to avoid socket leak.
+          // need to close the channel here to avoid a socket leak.
           close(channel)
-          error("Processor " + id + " closed connection from " + channel.getRemoteAddress, e)
+          error(s"Processor $id closed connection from ${channel.getRemoteAddress}", e)
       }
     }
   }
@@ -535,6 +561,9 @@ private[kafka] class Processor(val id: Int,
     selector.close()
   }
 
+  /* For test usage */
+  private[network] def channel(connectionId: String): Option[KafkaChannel] =
+    Option(selector.channel(connectionId))
 
   /**
    * Wakeup the thread for selection.

http://git-wip-us.apache.org/repos/asf/kafka/blob/e733d8c2/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 5d28894..81e5232 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -39,7 +39,7 @@ import org.junit.Assert._
 import org.junit._
 import org.scalatest.junit.JUnitSuite
 
-import scala.collection.Map
+import scala.collection.mutable.ArrayBuffer
 
 class SocketServerTest extends JUnitSuite {
   val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
@@ -55,6 +55,7 @@ class SocketServerTest extends JUnitSuite {
   val metrics = new Metrics
   val server = new SocketServer(config, metrics, new SystemTime)
   server.startup()
+  val sockets = new ArrayBuffer[Socket]
 
   def sendRequest(socket: Socket, request: Array[Byte], id: Option[Short] = None) {
     val outgoing = new DataOutputStream(socket.getOutputStream)
@@ -79,7 +80,12 @@ class SocketServerTest extends JUnitSuite {
 
   /* A simple request handler that just echos back the response */
   def processRequest(channel: RequestChannel) {
-    val request = channel.receiveRequest
+    val request = channel.receiveRequest(2000)
+    assertNotNull("receiveRequest timed out", request)
+    processRequest(channel, request)
+  }
+
+  def processRequest(channel: RequestChannel, request: RequestChannel.Request) {
     val byteBuffer = ByteBuffer.allocate(request.header.sizeOf + request.body.sizeOf)
     request.header.writeTo(byteBuffer)
     request.body.writeTo(byteBuffer)
@@ -89,13 +95,18 @@ class SocketServerTest extends JUnitSuite {
     channel.sendResponse(new RequestChannel.Response(request.processor, request, send))
   }
 
-  def connect(s: SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT) =
-    new Socket("localhost", server.boundPort(protocol))
+  def connect(s: SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT) = {
+    val socket = new Socket("localhost", s.boundPort(protocol))
+    sockets += socket
+    socket
+  }
 
   @After
-  def cleanup() {
+  def tearDown() {
     metrics.close()
     server.shutdown()
+    sockets.foreach(_.close())
+    sockets.clear()
   }
 
   private def producerRequestBytes: Array[Byte] = {
@@ -183,7 +194,7 @@ class SocketServerTest extends JUnitSuite {
 
   @Test
   def testMaxConnectionsPerIp() {
-    // make the maximum allowable number of connections and then leak them
+    // make the maximum allowable number of connections
     val conns = (0 until server.config.maxConnectionsPerIp).map(_ => connect())
     // now try one more (should fail)
     val conn = connect()
@@ -201,27 +212,30 @@ class SocketServerTest extends JUnitSuite {
     sendRequest(conn2, serializedBytes)
     val request = server.requestChannel.receiveRequest(2000)
     assertNotNull(request)
-    conn2.close()
-    conns.tail.foreach(_.close())
   }
 
   @Test
-  def testMaxConnectionsPerIPOverrides() {
-    val overrideNum = 6
-    val overrides = Map("localhost" -> overrideNum)
+  def testMaxConnectionsPerIpOverrides() {
+    val overrideNum = server.config.maxConnectionsPerIp + 1
     val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
+    overrideProps.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, s"localhost:$overrideNum")
     val serverMetrics = new Metrics()
-    val overrideServer: SocketServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics, new SystemTime())
+    val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics, new SystemTime())
     try {
       overrideServer.startup()
-      // make the maximum allowable number of connections and then leak them
-      val conns = ((0 until overrideNum).map(i => connect(overrideServer)))
+      // make the maximum allowable number of connections
+      val conns = (0 until overrideNum).map(_ => connect(overrideServer))
+
+      // it should succeed
+      val serializedBytes = producerRequestBytes
+      sendRequest(conns.last, serializedBytes)
+      val request = overrideServer.requestChannel.receiveRequest(2000)
+      assertNotNull(request)
+
       // now try one more (should fail)
       val conn = connect(overrideServer)
       conn.setSoTimeout(3000)
       assertEquals(-1, conn.getInputStream.read())
-      conn.close()
-      conns.foreach(_.close())
     } finally {
       overrideServer.shutdown()
       serverMetrics.close()
@@ -229,16 +243,16 @@ class SocketServerTest extends JUnitSuite {
   }
 
   @Test
-  def testSslSocketServer(): Unit = {
+  def testSslSocketServer() {
     val trustStoreFile = File.createTempFile("truststore", ".jks")
     val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, interBrokerSecurityProtocol = Some(SecurityProtocol.SSL),
       trustStoreFile = Some(trustStoreFile))
     overrideProps.put(KafkaConfig.ListenersProp, "SSL://localhost:0")
 
     val serverMetrics = new Metrics
-    val overrideServer: SocketServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics, new SystemTime)
-    overrideServer.startup()
+    val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics, new SystemTime)
     try {
+      overrideServer.startup()
       val sslContext = SSLContext.getInstance("TLSv1.2")
       sslContext.init(null, Array(TestUtils.trustAllCerts), new java.security.SecureRandom())
       val socketFactory = sslContext.getSocketFactory
@@ -271,12 +285,95 @@ class SocketServerTest extends JUnitSuite {
   }
 
   @Test
-  def testSessionPrincipal(): Unit = {
+  def testSessionPrincipal() {
     val socket = connect()
     val bytes = new Array[Byte](40)
     sendRequest(socket, bytes, Some(0))
-    assertEquals(KafkaPrincipal.ANONYMOUS, server.requestChannel.receiveRequest().session.principal)
-    socket.close()
+    assertEquals(KafkaPrincipal.ANONYMOUS, server.requestChannel.receiveRequest(2000).session.principal)
+  }
+
+  /* Test that we update request metrics if the client closes the connection while the broker response is in flight. */
+  @Test
+  def testClientDisconnectionUpdatesRequestMetrics() {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
+    val serverMetrics = new Metrics
+    var conn: Socket = null
+    val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, new SystemTime) {
+      override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, protocol: SecurityProtocol): Processor = {
+        new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas,
+          config.connectionsMaxIdleMs, protocol, config.values, metrics) {
+          override protected[network] def sendResponse(response: RequestChannel.Response) {
+            conn.close()
+            super.sendResponse(response)
+          }
+        }
+      }
+    }
+    try {
+      overrideServer.startup()
+      conn = connect(overrideServer)
+      val serializedBytes = producerRequestBytes
+      sendRequest(conn, serializedBytes)
+
+      val channel = overrideServer.requestChannel
+      val request = channel.receiveRequest(2000)
+
+      val requestMetrics = RequestMetrics.metricsMap(ApiKeys.forId(request.requestId).name)
+      def totalTimeHistCount(): Long = requestMetrics.totalTimeHist.count
+      val expectedTotalTimeCount = totalTimeHistCount() + 1
+
+      // send a large buffer to ensure that the broker detects the client disconnection while writing to the socket channel.
+      // On Mac OS X, the initial write seems to always succeed and it is able to write up to 102400 bytes on the initial
+      // write. If the buffer is smaller than this, the write is considered complete and the disconnection is not
+      // detected. If the buffer is larger than 102400 bytes, a second write is attempted and it fails with an
+      // IOException.
+      val send = new NetworkSend(request.connectionId, ByteBuffer.allocate(550000))
+      channel.sendResponse(new RequestChannel.Response(request.processor, request, send))
+      TestUtils.waitUntilTrue(() => totalTimeHistCount() == expectedTotalTimeCount,
+        s"request metrics not updated, expected: $expectedTotalTimeCount, actual: ${totalTimeHistCount()}")
+
+    } finally {
+      overrideServer.shutdown()
+      serverMetrics.close()
+    }
+  }
+
+  /*
+   * Test that we update request metrics if the channel has been removed from the selector when the broker calls
+   * `selector.send` (selector closes old connections, for example).
+   */
+  @Test
+  def testBrokerSendAfterChannelClosedUpdatesRequestMetrics() {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
+    props.setProperty(KafkaConfig.ConnectionsMaxIdleMsProp, "100")
+    val serverMetrics = new Metrics
+    var conn: Socket = null
+    val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, new SystemTime)
+    try {
+      overrideServer.startup()
+      conn = connect(overrideServer)
+      val serializedBytes = producerRequestBytes
+      sendRequest(conn, serializedBytes)
+      val channel = overrideServer.requestChannel
+      val request = channel.receiveRequest(2000)
+
+      TestUtils.waitUntilTrue(() => overrideServer.processor(request.processor).channel(request.connectionId).isEmpty,
+        s"Idle connection `${request.connectionId}` was not closed by selector")
+
+      val requestMetrics = RequestMetrics.metricsMap(ApiKeys.forId(request.requestId).name)
+      def totalTimeHistCount(): Long = requestMetrics.totalTimeHist.count
+      val expectedTotalTimeCount = totalTimeHistCount() + 1
+
+      processRequest(channel, request)
+
+      TestUtils.waitUntilTrue(() => totalTimeHistCount() == expectedTotalTimeCount,
+        s"request metrics not updated, expected: $expectedTotalTimeCount, actual: ${totalTimeHistCount()}")
+
+    } finally {
+      overrideServer.shutdown()
+      serverMetrics.close()
+    }
+
   }
 
 }


[06/21] kafka git commit: MINOR: update new version in additional places

Posted by gw...@apache.org.
MINOR: update new version in additional places

Note: This goes only to trunk. 0.10.0 branch will need a separate PR with different versions.

Author: Gwen Shapira <cs...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1109 from gwenshap/minor-fix-version-trunk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9c5af253
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9c5af253
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9c5af253

Branch: refs/heads/0.10.0
Commit: 9c5af253b170e2570ba2e17b4155520cea474e81
Parents: bc47e23
Author: Gwen Shapira <cs...@gmail.com>
Authored: Mon Mar 21 13:40:59 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700

----------------------------------------------------------------------
 kafka-merge-pr.py           | 2 +-
 tests/kafkatest/__init__.py | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9c5af253/kafka-merge-pr.py
----------------------------------------------------------------------
diff --git a/kafka-merge-pr.py b/kafka-merge-pr.py
index e124105..f26a0a9 100644
--- a/kafka-merge-pr.py
+++ b/kafka-merge-pr.py
@@ -72,7 +72,7 @@ RELEASE_BRANCH_PREFIX = "0."
 
 DEV_BRANCH_NAME = "trunk"
 
-DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "0.10.0.0")
+DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "0.10.1.0")
 
 def get_json(url):
     try:

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c5af253/tests/kafkatest/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py
index df1a612..10163a0 100644
--- a/tests/kafkatest/__init__.py
+++ b/tests/kafkatest/__init__.py
@@ -23,4 +23,4 @@
 # Instead, in trunk, the version should have a suffix of the form ".devN"
 #
 # For example, when Kafka is at version 0.9.0.0-SNAPSHOT, this should be something like "0.9.0.0.dev0"
-__version__ = '0.10.0.0.dev0'
+__version__ = '0.10.1.0.dev0'


[13/21] kafka git commit: KAFKA-3495; NetworkClient.blockingSendAndReceive` should rely on requestTimeout

Posted by gw...@apache.org.
KAFKA-3495; NetworkClient.blockingSendAndReceive` should rely on requestTimeout

Also removed the code for handling negative timeouts in `blockingReady` as `Selector.poll` has not supported that for a while.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #1177 from ijuma/kafka-3495-blocking-send-and-receive-request-timeout


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/625c516e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/625c516e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/625c516e

Branch: refs/heads/0.10.0
Commit: 625c516e0e4096c6bccde998d5973525b8be196d
Parents: eb08e49
Author: Ismael Juma <is...@juma.me.uk>
Authored: Sun Apr 3 16:34:46 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java |  2 +-
 .../controller/ControllerChannelManager.scala   |  4 +-
 .../main/scala/kafka/server/KafkaServer.scala   |  9 +--
 .../kafka/server/ReplicaFetcherThread.scala     |  4 +-
 .../kafka/utils/NetworkClientBlockingOps.scala  | 66 +++++++++++---------
 5 files changed, 40 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/625c516e/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 4d01cde..d22b508 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -390,7 +390,7 @@ public class NetworkClient implements KafkaClient {
     }
 
     /**
-     * Iterate over all the inflight requests and expire any requests that have exceeded the configured the requestTimeout.
+     * Iterate over all the inflight requests and expire any requests that have exceeded the configured requestTimeout.
      * The connection to the node associated with the request will be terminated and will be treated as a disconnection.
      *
      * @param responses The list of responses to update

http://git-wip-us.apache.org/repos/asf/kafka/blob/625c516e/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index b376d15..e9731fd 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -178,9 +178,7 @@ class RequestSendThread(val controllerId: Int,
               val requestHeader = apiVersion.fold(networkClient.nextRequestHeader(apiKey))(networkClient.nextRequestHeader(apiKey, _))
               val send = new RequestSend(brokerNode.idString, requestHeader, request.toStruct)
               val clientRequest = new ClientRequest(time.milliseconds(), true, send, null)
-              clientResponse = networkClient.blockingSendAndReceive(clientRequest, socketTimeoutMs)(time).getOrElse {
-                throw new SocketTimeoutException(s"No response received within $socketTimeoutMs ms")
-              }
+              clientResponse = networkClient.blockingSendAndReceive(clientRequest)(time)
               isSendSuccessful = true
             }
           } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/625c516e/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index e29494b..f998d82 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -320,9 +320,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
 
     val socketTimeoutMs = config.controllerSocketTimeoutMs
 
-    def socketTimeoutException: Throwable =
-      new SocketTimeoutException(s"Did not receive response within $socketTimeoutMs")
-
     def networkClientControlledShutdown(retries: Int): Boolean = {
       val metadataUpdater = new ManualMetadataUpdater()
       val networkClient = {
@@ -388,16 +385,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
             try {
 
               if (!networkClient.blockingReady(node(prevController), socketTimeoutMs))
-                throw socketTimeoutException
+                throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
 
               // send the controlled shutdown request
               val requestHeader = networkClient.nextRequestHeader(ApiKeys.CONTROLLED_SHUTDOWN_KEY)
               val send = new RequestSend(node(prevController).idString, requestHeader,
                 new ControlledShutdownRequest(config.brokerId).toStruct)
               val request = new ClientRequest(kafkaMetricsTime.milliseconds(), true, send, null)
-              val clientResponse = networkClient.blockingSendAndReceive(request, socketTimeoutMs).getOrElse {
-                throw socketTimeoutException
-              }
+              val clientResponse = networkClient.blockingSendAndReceive(request)
 
               val shutdownResponse = new ControlledShutdownResponse(clientResponse.responseBody)
               if (shutdownResponse.errorCode == Errors.NONE.code && shutdownResponse.partitionsRemaining.isEmpty) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/625c516e/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index de7269f..26838ca 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -233,9 +233,7 @@ class ReplicaFetcherThread(name: String,
       else {
         val send = new RequestSend(sourceBroker.id.toString, header, request.toStruct)
         val clientRequest = new ClientRequest(time.milliseconds(), true, send, null)
-        networkClient.blockingSendAndReceive(clientRequest, socketTimeout)(time).getOrElse {
-          throw new SocketTimeoutException(s"No response received within $socketTimeout ms")
-        }
+        networkClient.blockingSendAndReceive(clientRequest)(time)
       }
     }
     catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/625c516e/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
index 9ed9d29..fd4af6e 100644
--- a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
+++ b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
@@ -55,6 +55,7 @@ class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal {
    * care.
    */
   def blockingReady(node: Node, timeout: Long)(implicit time: JTime): Boolean = {
+    require(timeout >=0, "timeout should be >= 0")
     client.ready(node, time.milliseconds()) || pollUntil(timeout) { (_, now) =>
       if (client.isReady(node, now))
         true
@@ -65,19 +66,18 @@ class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal {
   }
 
   /**
-   * Invokes `client.send` followed by 1 or more `client.poll` invocations until a response is received,
-   * the timeout expires or a disconnection happens.
+   * Invokes `client.send` followed by 1 or more `client.poll` invocations until a response is received or a
+   * disconnection happens (which can happen for a number of reasons including a request timeout).
    *
-   * It returns `true` if the call completes normally or `false` if the timeout expires. In the case of a disconnection,
-   * an `IOException` is thrown instead.
+   * In case of a disconnection, an `IOException` is thrown.
    *
    * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
    * care.
    */
-  def blockingSendAndReceive(request: ClientRequest, timeout: Long)(implicit time: JTime): Option[ClientResponse] = {
+  def blockingSendAndReceive(request: ClientRequest)(implicit time: JTime): ClientResponse = {
     client.send(request, time.milliseconds())
 
-    pollUntilFound(timeout) { case (responses, _) =>
+    pollContinuously { responses =>
       val response = responses.find { response =>
         response.request.request.header.correlationId == request.request.header.correlationId
       }
@@ -102,41 +102,45 @@ class NetworkClientBlockingOps(val client: NetworkClient) extends AnyVal {
    * care.
    */
   private def pollUntil(timeout: Long)(predicate: (Seq[ClientResponse], Long) => Boolean)(implicit time: JTime): Boolean = {
-    pollUntilFound(timeout) { (responses, now) =>
-      if (predicate(responses, now)) Some(true)
-      else None
-    }.fold(false)(_ => true)
-  }
-
-  /**
-   * Invokes `client.poll` until `collect` returns `Some` or the timeout expires.
-   *
-   * It returns the result of `collect` if the call completes normally or `None` if the timeout expires. Exceptions
-   * thrown via `collect` are not handled and will bubble up.
-   *
-   * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
-   * care.
-   */
-  private def pollUntilFound[T](timeout: Long)(collect: (Seq[ClientResponse], Long) => Option[T])(implicit time: JTime): Option[T] = {
-
     val methodStartTime = time.milliseconds()
     val timeoutExpiryTime = methodStartTime + timeout
 
     @tailrec
-    def recurse(iterationStartTime: Long): Option[T] = {
-      val pollTimeout = if (timeout < 0) timeout else timeoutExpiryTime - iterationStartTime
+    def recursivePoll(iterationStartTime: Long): Boolean = {
+      val pollTimeout = timeoutExpiryTime - iterationStartTime
       val responses = client.poll(pollTimeout, iterationStartTime).asScala
-      val result = collect(responses, iterationStartTime)
-      if (result.isDefined) result
+      if (predicate(responses, iterationStartTime)) true
       else {
         val afterPollTime = time.milliseconds()
-        if (timeout < 0 || afterPollTime < timeoutExpiryTime)
-          recurse(afterPollTime)
-        else None
+        if (afterPollTime < timeoutExpiryTime) recursivePoll(afterPollTime)
+        else false
+      }
+    }
+
+    recursivePoll(methodStartTime)
+  }
+
+  /**
+    * Invokes `client.poll` until `collect` returns `Some`. The value inside `Some` is returned.
+    *
+    * Exceptions thrown via `collect` are not handled and will bubble up.
+    *
+    * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
+    * care.
+    */
+  private def pollContinuously[T](collect: Seq[ClientResponse] => Option[T])(implicit time: JTime): T = {
+
+    @tailrec
+    def recursivePoll: T = {
+      // rely on request timeout to ensure we don't block forever
+      val responses = client.poll(Long.MaxValue, time.milliseconds()).asScala
+      collect(responses) match {
+        case Some(result) => result
+        case None => recursivePoll
       }
     }
 
-    recurse(methodStartTime)
+    recursivePoll
   }
 
 }


[16/21] kafka git commit: KAFKA-2998: log warnings when client is disconnected from bootstrap brokers

Posted by gw...@apache.org.
KAFKA-2998: log warnings when client is disconnected from bootstrap brokers

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Grant Henke, Guozhang Wang

Closes #769 from hachikuji/KAFKA-2998


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c36268f7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c36268f7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c36268f7

Branch: refs/heads/0.10.0
Commit: c36268f77fbf7f6a47a1e09ec3e38c20173a06c5
Parents: 9897813
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Apr 4 21:28:59 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java    |  8 ++++++++
 .../main/java/org/apache/kafka/common/Cluster.java | 17 +++++++++++++++--
 2 files changed, 23 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c36268f7/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index d22b508..d2eaace 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -556,6 +556,14 @@ public class NetworkClient implements KafkaClient {
             ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey());
 
             if (requestKey == ApiKeys.METADATA) {
+                Cluster cluster = metadata.fetch();
+                if (cluster.isBootstrapConfigured()) {
+                    int nodeId = Integer.parseInt(request.request().destination());
+                    Node node = cluster.nodeById(nodeId);
+                    if (node != null)
+                        log.warn("Bootstrap broker {}:{} disconnected", node.host(), node.port());
+                }
+
                 metadataFetchInProgress = false;
                 return true;
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c36268f7/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 8e85df8..e1bf581 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -29,6 +29,7 @@ import java.util.Set;
  */
 public final class Cluster {
 
+    private final boolean isBootstrapConfigured;
     private final List<Node> nodes;
     private final Set<String> unauthorizedTopics;
     private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
@@ -45,11 +46,19 @@ public final class Cluster {
     public Cluster(Collection<Node> nodes,
                    Collection<PartitionInfo> partitions,
                    Set<String> unauthorizedTopics) {
+        this(false, nodes, partitions, unauthorizedTopics);
+    }
+
+    private Cluster(boolean isBootstrapConfigured,
+                    Collection<Node> nodes,
+                    Collection<PartitionInfo> partitions,
+                    Set<String> unauthorizedTopics) {
+        this.isBootstrapConfigured = isBootstrapConfigured;
+
         // make a randomized, unmodifiable copy of the nodes
         List<Node> copy = new ArrayList<>(nodes);
         Collections.shuffle(copy);
         this.nodes = Collections.unmodifiableList(copy);
-        
         this.nodesById = new HashMap<>();
         for (Node node : nodes)
             this.nodesById.put(node.id(), node);
@@ -115,7 +124,7 @@ public final class Cluster {
         int nodeId = -1;
         for (InetSocketAddress address : addresses)
             nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
-        return new Cluster(nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet());
+        return new Cluster(true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet());
     }
 
     /**
@@ -214,6 +223,10 @@ public final class Cluster {
         return unauthorizedTopics;
     }
 
+    public boolean isBootstrapConfigured() {
+        return isBootstrapConfigured;
+    }
+
     @Override
     public String toString() {
         return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";


[08/21] kafka git commit: KAFKA-3407 - ErrorLoggingCallback trims helpful diagnostic information.

Posted by gw...@apache.org.
KAFKA-3407 - ErrorLoggingCallback trims helpful diagnostic information.

This should help when diagnosing issues with the console producer. This allows the logger to use `exception` rather than `exception.getMessage()`.

Author: Jeremy Custenborder <jc...@gmail.com>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1079 from jcustenborder/KAFKA-3407


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ae0a5a0d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ae0a5a0d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ae0a5a0d

Branch: refs/heads/0.10.0
Commit: ae0a5a0dfdb6f5d69322ce0fa4da2c6e5e0daeb6
Parents: e8593d1
Author: Jeremy Custenborder <jc...@gmail.com>
Authored: Thu Mar 24 11:03:30 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/internals/ErrorLoggingCallback.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ae0a5a0d/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java
index 747e29f..18088c1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ErrorLoggingCallback.java
@@ -44,8 +44,8 @@ public class ErrorLoggingCallback implements Callback {
                     logAsString ? new String(key) : key.length + " bytes";
             String valueString = (valueLength == -1) ? "null" :
                     logAsString ? new String(value) : valueLength + " bytes";
-            log.error("Error when sending message to topic {} with key: {}, value: {} with error: {}",
-                    topic, keyString, valueString, e.getMessage());
+            log.error("Error when sending message to topic {} with key: {}, value: {} with error:",
+                    topic, keyString, valueString, e);
         }
     }
 }


[19/21] kafka git commit: KAFKA-3508: Fix transient SimpleACLAuthorizerTest failures

Posted by gw...@apache.org.
KAFKA-3508: Fix transient SimpleACLAuthorizerTest failures

Allows the the maximum retires when writing to zookeeper to be overridden in tests and sets the value to Int.MaxValue to avoid transient failure.

Author: Grant Henke <gr...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1156 from granthenke/transient-acl-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/35fadbf6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/35fadbf6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/35fadbf6

Branch: refs/heads/0.10.0
Commit: 35fadbf639650a14f061a97904755d12499fd7fa
Parents: e733d8c
Author: Grant Henke <gr...@gmail.com>
Authored: Tue Apr 5 15:17:46 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/security/auth/SimpleAclAuthorizer.scala   | 2 +-
 .../unit/kafka/security/auth/SimpleAclAuthorizerTest.scala     | 6 +++++-
 2 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/35fadbf6/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 1a06af2..18fff45 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -79,7 +79,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
 
   // The maximum number of times we should try to update the resource acls in zookeeper before failing;
   // This should never occur, but is a safeguard just in case.
-  private val maxUpdateRetries = 10
+  protected[auth] var maxUpdateRetries = 10
 
   private val retryBackoffMs = 100
   private val retryBackoffJitterMs = 50

http://git-wip-us.apache.org/repos/asf/kafka/blob/35fadbf6/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index bdadb15..7fcc33d 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -44,6 +44,10 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
   override def setUp() {
     super.setUp()
 
+    // Increase maxUpdateRetries to avoid transient failures
+    simpleAclAuthorizer.maxUpdateRetries = Int.MaxValue
+    simpleAclAuthorizer2.maxUpdateRetries = Int.MaxValue
+
     val props = TestUtils.createBrokerConfig(0, zkConnect)
     props.put(SimpleAclAuthorizer.SuperUsersProp, superUsers)
 
@@ -307,7 +311,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
   def testHighConcurrencyModificationOfResourceAcls() {
     val commonResource = new Resource(Topic, "test")
 
-    val acls = (0 to 100).map { i =>
+    val acls = (0 to 50).map { i =>
       val useri = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, i.toString)
       new Acl(useri, Allow, WildCardHost, Read)
     }


[03/21] kafka git commit: KAFKA-3510; OffsetIndex thread safety

Posted by gw...@apache.org.
KAFKA-3510; OffsetIndex thread safety

* Make all fields accessed outside of a lock `volatile`
* Only allow mutation within the class
* Remove unnecessary `AtomicInteger` since mutation always happens inside a lock

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #1188 from ijuma/kafka-3510-offset-index-thread-safety


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/70301482
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/70301482
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/70301482

Branch: refs/heads/0.10.0
Commit: 703014824aeb0690dfb30a98fcd0f11e9d1e68fc
Parents: d1a5883
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue Apr 5 11:46:04 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         |   2 +-
 core/src/main/scala/kafka/log/LogSegment.scala  |   2 +-
 core/src/main/scala/kafka/log/OffsetIndex.scala | 173 ++++++++++---------
 .../scala/kafka/tools/DumpLogSegments.scala     |   2 +-
 .../scala/unit/kafka/log/OffsetIndexTest.scala  |  10 +-
 5 files changed, 98 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/70301482/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 81c19fa..8465b64 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -215,7 +215,7 @@ class Log(val dir: File,
       val fileName = logFile.getName
       val startOffset = fileName.substring(0, fileName.length - LogFileSuffix.length).toLong
       val indexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, IndexFileSuffix) + SwapFileSuffix)
-      val index =  new OffsetIndex(file = indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
+      val index =  new OffsetIndex(indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
       val swapSegment = new LogSegment(new FileMessageSet(file = swapFile),
                                        index = index,
                                        baseOffset = startOffset,

http://git-wip-us.apache.org/repos/asf/kafka/blob/70301482/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 9fc68a4..3a4bbc8 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -55,7 +55,7 @@ class LogSegment(val log: FileMessageSet,
 
   def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) =
     this(new FileMessageSet(file = Log.logFilename(dir, startOffset), fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate),
-         new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
+         new OffsetIndex(Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
          startOffset,
          indexIntervalBytes,
          rollJitterMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/70301482/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index e95c9d1..ce35d68 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -24,7 +24,6 @@ import java.io._
 import java.nio._
 import java.nio.channels._
 import java.util.concurrent.locks._
-import java.util.concurrent.atomic._
 import kafka.utils._
 import kafka.utils.CoreUtils.inLock
 import kafka.common.InvalidOffsetException
@@ -54,62 +53,70 @@ import kafka.common.InvalidOffsetException
  * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal 
  * storage format.
  */
-class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
+class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
   
   private val lock = new ReentrantLock
   
   /* initialize the memory mapping for this index */
-  private var mmap: MappedByteBuffer = 
-    {
-      val newlyCreated = file.createNewFile()
-      val raf = new RandomAccessFile(file, "rw")
-      try {
-        /* pre-allocate the file if necessary */
-        if(newlyCreated) {
-          if(maxIndexSize < 8)
-            throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
-          raf.setLength(roundToExactMultiple(maxIndexSize, 8))
-        }
-          
-        /* memory-map the file */
-        val len = raf.length()
-        val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
-          
-        /* set the position in the index for the next entry */
-        if(newlyCreated)
-          idx.position(0)
-        else
-          // if this is a pre-existing index, assume it is all valid and set position to last entry
-          idx.position(roundToExactMultiple(idx.limit, 8))
-        idx
-      } finally {
-        CoreUtils.swallow(raf.close())
+  @volatile
+  private[this] var mmap: MappedByteBuffer = {
+    val newlyCreated = _file.createNewFile()
+    val raf = new RandomAccessFile(_file, "rw")
+    try {
+      /* pre-allocate the file if necessary */
+      if (newlyCreated) {
+        if (maxIndexSize < 8)
+          throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
+        raf.setLength(roundToExactMultiple(maxIndexSize, 8))
       }
+
+      /* memory-map the file */
+      val len = raf.length()
+      val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
+
+      /* set the position in the index for the next entry */
+      if (newlyCreated)
+        idx.position(0)
+      else
+        // if this is a pre-existing index, assume it is all valid and set position to last entry
+        idx.position(roundToExactMultiple(idx.limit, 8))
+      idx
+    } finally {
+      CoreUtils.swallow(raf.close())
     }
-  
+  }
+
   /* the number of eight-byte entries currently in the index */
-  private var size = new AtomicInteger(mmap.position / 8)
-  
-  /**
-   * The maximum number of eight-byte entries this index can hold
-   */
   @volatile
-  var maxEntries = mmap.limit / 8
-  
-  /* the last offset in the index */
-  var lastOffset = readLastEntry.offset
+  private[this] var _entries = mmap.position / 8
+
+  /* The maximum number of eight-byte entries this index can hold */
+  @volatile
+  private[this] var _maxEntries = mmap.limit / 8
+
+  @volatile
+  private[this] var _lastOffset = readLastEntry.offset
   
   debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d"
-    .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position))
+    .format(_file.getAbsolutePath, _maxEntries, maxIndexSize, _entries, _lastOffset, mmap.position))
+
+  /** The maximum number of entries this index can hold */
+  def maxEntries: Int = _maxEntries
+
+  /** The last offset in the index */
+  def lastOffset: Long = _lastOffset
+
+  /** The index file */
+  def file: File = _file
 
   /**
    * The last entry in the index
    */
   def readLastEntry(): OffsetPosition = {
     inLock(lock) {
-      size.get match {
+      _entries match {
         case 0 => OffsetPosition(baseOffset, 0)
-        case s => OffsetPosition(baseOffset + relativeOffset(this.mmap, s-1), physical(this.mmap, s-1))
+        case s => OffsetPosition(baseOffset + relativeOffset(mmap, s - 1), physical(mmap, s - 1))
       }
     }
   }
@@ -149,22 +156,22 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
     val relOffset = targetOffset - baseOffset
     
     // check if the index is empty
-    if(entries == 0)
+    if (_entries == 0)
       return -1
     
     // check if the target offset is smaller than the least offset
-    if(relativeOffset(idx, 0) > relOffset)
+    if (relativeOffset(idx, 0) > relOffset)
       return -1
       
     // binary search for the entry
     var lo = 0
-    var hi = entries-1
-    while(lo < hi) {
+    var hi = _entries - 1
+    while (lo < hi) {
       val mid = ceil(hi/2.0 + lo/2.0).toInt
       val found = relativeOffset(idx, mid)
-      if(found == relOffset)
+      if (found == relOffset)
         return mid
-      else if(found < relOffset)
+      else if (found < relOffset)
         lo = mid
       else
         hi = mid - 1
@@ -185,8 +192,8 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
    */
   def entry(n: Int): OffsetPosition = {
     maybeLock(lock) {
-      if(n >= entries)
-        throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, entries))
+      if(n >= _entries)
+        throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, _entries))
       val idx = mmap.duplicate
       OffsetPosition(relativeOffset(idx, n), physical(idx, n))
     }
@@ -197,17 +204,17 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
    */
   def append(offset: Long, position: Int) {
     inLock(lock) {
-      require(!isFull, "Attempt to append to a full index (size = " + size + ").")
-      if (size.get == 0 || offset > lastOffset) {
-        debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))
-        this.mmap.putInt((offset - baseOffset).toInt)
-        this.mmap.putInt(position)
-        this.size.incrementAndGet()
-        this.lastOffset = offset
-        require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")
+      require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
+      if (_entries == 0 || offset > _lastOffset) {
+        debug("Adding index entry %d => %d to %s.".format(offset, position, _file.getName))
+        mmap.putInt((offset - baseOffset).toInt)
+        mmap.putInt(position)
+        _entries += 1
+        _lastOffset = offset
+        require(_entries * 8 == mmap.position, _entries + " entries but file position in index is " + mmap.position + ".")
       } else {
         throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."
-          .format(offset, entries, lastOffset, file.getAbsolutePath))
+          .format(offset, _entries, _lastOffset, _file.getAbsolutePath))
       }
     }
   }
@@ -215,7 +222,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
   /**
    * True iff there are no more slots available in this index
    */
-  def isFull: Boolean = entries >= this.maxEntries
+  def isFull: Boolean = _entries >= _maxEntries
   
   /**
    * Truncate the entire index, deleting all entries
@@ -252,9 +259,9 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
    */
   private def truncateToEntries(entries: Int) {
     inLock(lock) {
-      this.size.set(entries)
-      mmap.position(this.size.get * 8)
-      this.lastOffset = readLastEntry.offset
+      _entries = entries
+      mmap.position(_entries * 8)
+      _lastOffset = readLastEntry.offset
     }
   }
   
@@ -264,7 +271,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
    */
   def trimToValidSize() {
     inLock(lock) {
-      resize(entries * 8)
+      resize(_entries * 8)
     }
   }
 
@@ -276,18 +283,18 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
    */
   def resize(newSize: Int) {
     inLock(lock) {
-      val raf = new RandomAccessFile(file, "rw")
+      val raf = new RandomAccessFile(_file, "rw")
       val roundedNewSize = roundToExactMultiple(newSize, 8)
-      val position = this.mmap.position
+      val position = mmap.position
       
       /* Windows won't let us modify the file length while the file is mmapped :-( */
-      if(Os.isWindows)
-        forceUnmap(this.mmap)
+      if (Os.isWindows)
+        forceUnmap(mmap)
       try {
         raf.setLength(roundedNewSize)
-        this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
-        this.maxEntries = this.mmap.limit / 8
-        this.mmap.position(position)
+        mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
+        _maxEntries = mmap.limit / 8
+        mmap.position(position)
       } finally {
         CoreUtils.swallow(raf.close())
       }
@@ -319,19 +326,19 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
    * Delete this index file
    */
   def delete(): Boolean = {
-    info("Deleting index " + this.file.getAbsolutePath)
-    if(Os.isWindows)
-      CoreUtils.swallow(forceUnmap(this.mmap))
-    this.file.delete()
+    info("Deleting index " + _file.getAbsolutePath)
+    if (Os.isWindows)
+      CoreUtils.swallow(forceUnmap(mmap))
+    _file.delete()
   }
   
   /** The number of entries in this index */
-  def entries() = size.get
+  def entries = _entries
   
   /**
    * The number of bytes actually used by this index
    */
-  def sizeInBytes() = 8 * entries
+  def sizeInBytes() = 8 * _entries
   
   /** Close the index */
   def close() {
@@ -343,8 +350,8 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
    * @throws IOException if rename fails
    */
   def renameTo(f: File) {
-    try Utils.atomicMoveWithFallback(file.toPath, f.toPath)
-    finally this.file = f
+    try Utils.atomicMoveWithFallback(_file.toPath, f.toPath)
+    finally _file = f
   }
   
   /**
@@ -352,13 +359,13 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
    * @throws IllegalArgumentException if any problems are found
    */
   def sanityCheck() {
-    require(entries == 0 || lastOffset > baseOffset,
+    require(_entries == 0 || lastOffset > baseOffset,
             "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d"
-            .format(file.getAbsolutePath, lastOffset, baseOffset))
-      val len = file.length()
-      require(len % 8 == 0, 
-              "Index file " + file.getName + " is corrupt, found " + len + 
-              " bytes which is not positive or not a multiple of 8.")
+            .format(_file.getAbsolutePath, lastOffset, baseOffset))
+    val len = _file.length()
+    require(len % 8 == 0,
+            "Index file " + _file.getName + " is corrupt, found " + len +
+            " bytes which is not positive or not a multiple of 8.")
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/70301482/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index e882a30..dc99672 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -124,7 +124,7 @@ object DumpLogSegments {
     val startOffset = file.getName().split("\\.")(0).toLong
     val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + Log.LogFileSuffix)
     val messageSet = new FileMessageSet(logFile, false)
-    val index = new OffsetIndex(file = file, baseOffset = startOffset)
+    val index = new OffsetIndex(file, baseOffset = startOffset)
 
     //Check that index passes sanityCheck, this is the check that determines if indexes will be rebuilt on startup or not.
     if (indexSanityOnly) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/70301482/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
index dfd7b54..869e618 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
@@ -34,7 +34,7 @@ class OffsetIndexTest extends JUnitSuite {
   
   @Before
   def setup() {
-    this.idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 45L, maxIndexSize = 30 * 8)
+    this.idx = new OffsetIndex(nonExistantTempFile(), baseOffset = 45L, maxIndexSize = 30 * 8)
   }
   
   @After
@@ -103,7 +103,7 @@ class OffsetIndexTest extends JUnitSuite {
     idx.append(first.offset, first.position)
     idx.append(sec.offset, sec.position)
     idx.close()
-    val idxRo = new OffsetIndex(file = idx.file, baseOffset = idx.baseOffset)
+    val idxRo = new OffsetIndex(idx.file, baseOffset = idx.baseOffset)
     assertEquals(first, idxRo.lookup(first.offset))
     assertEquals(sec, idxRo.lookup(sec.offset))
     assertEquals(sec.offset, idxRo.lastOffset)
@@ -113,7 +113,7 @@ class OffsetIndexTest extends JUnitSuite {
   
   @Test
   def truncate() {
-	val idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8)
+	val idx = new OffsetIndex(nonExistantTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8)
 	idx.truncate()
     for(i <- 1 until 10)
       idx.append(i, i)
@@ -140,7 +140,7 @@ class OffsetIndexTest extends JUnitSuite {
     idx.append(5, 5)
     
     idx.truncate()
-    assertEquals("Full truncation should leave no entries", 0, idx.entries())
+    assertEquals("Full truncation should leave no entries", 0, idx.entries)
     idx.append(0, 0)
   }
   
@@ -169,4 +169,4 @@ class OffsetIndexTest extends JUnitSuite {
     file.delete()
     file
   }
-}
\ No newline at end of file
+}


[02/21] kafka git commit: Changing version to 0.10.0.0

Posted by gw...@apache.org.
Changing version to 0.10.0.0


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/050bf60a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/050bf60a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/050bf60a

Branch: refs/heads/0.10.0
Commit: 050bf60a4725b239db76732d0f7d7f1c7c41ab06
Parents: 5c5fe7b
Author: Gwen Shapira <cs...@gmail.com>
Authored: Mon Mar 21 09:48:02 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700

----------------------------------------------------------------------
 gradle.properties | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/050bf60a/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
index 0a612f6..7f30b4d 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -16,7 +16,7 @@
 group=org.apache.kafka
 # NOTE: When you change this version number, you should also make sure to update
 # the version numbers in tests/kafkatest/__init__.py and kafka-merge-pr.py.
-version=0.10.1.0-SNAPSHOT
+version=0.10.0.0
 scalaVersion=2.10.6
 task=build
 org.gradle.jvmargs=-XX:MaxPermSize=512m -Xmx1024m -Xss2m