You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jg...@apache.org on 2014/07/29 00:18:22 UTC
[2/4] SAMZA-123: Move topic partition grouping to the AM and
generalize
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala b/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
new file mode 100644
index 0000000..f8a535a
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job
+
+import org.junit.Test
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.Partition
+import org.apache.samza.util.Util._
+import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions}
+import org.junit.Assert._
+
+class TestShellCommandBuilder {
+
+ @Test
+ def testJsonCreateStreamPartitionStringRoundTrip() {
+ val getPartitions: Set[SystemStreamPartition] = {
+ // Build a heavily skewed set of partitions.
+ def partitionSet(max:Int) = (0 until max).map(new Partition(_)).toSet
+ val system = "all-same-system."
+ val lotsOfParts = Map(system + "topic-with-many-parts-a" -> partitionSet(128),
+ system + "topic-with-many-parts-b" -> partitionSet(128), system + "topic-with-many-parts-c" -> partitionSet(64))
+ val fewParts = ('c' to 'z').map(l => system + l.toString -> partitionSet(4)).toMap
+ val streamsMap = (lotsOfParts ++ fewParts)
+ (for(s <- streamsMap.keys;
+ part <- streamsMap.getOrElse(s, Set.empty)) yield new SystemStreamPartition(getSystemStreamFromNames(s), part)).toSet
+ }
+
+ // Group by partition...
+ val sspTaskNameMap = TaskNamesToSystemStreamPartitions(getPartitions.groupBy(p => new TaskName(p.getPartition.toString)).toMap)
+
+ val asString = ShellCommandBuilder.serializeSystemStreamPartitionSetToJSON(sspTaskNameMap.getJavaFriendlyType)
+
+ val backFromSSPTaskNameMap = TaskNamesToSystemStreamPartitions(ShellCommandBuilder.deserializeSystemStreamPartitionSetFromJSON(asString))
+ assertEquals(sspTaskNameMap, backFromSSPTaskNameMap)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala b/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
index 4f7ddcd..d425e86 100644
--- a/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
@@ -21,7 +21,6 @@ package org.apache.samza.metrics
import org.junit.Assert._
import org.junit.Test
-import org.apache.samza.config.MapConfig
import grizzled.slf4j.Logging
import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL}
import java.io.IOException
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
index 70d8c80..0d07314 100644
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
@@ -19,40 +19,40 @@
package org.apache.samza.serializers
+import java.util
+import org.apache.samza.Partition
+import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.container.TaskName
+import org.apache.samza.system.SystemStreamPartition
import org.junit.Assert._
import org.junit.Test
-import org.apache.samza.system.SystemStream
-import org.apache.samza.checkpoint.Checkpoint
import scala.collection.JavaConversions._
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.SamzaException
-import org.apache.samza.Partition
class TestCheckpointSerde {
@Test
def testExactlyOneOffset {
val serde = new CheckpointSerde
- var offsets = Map[SystemStream, String]()
- val systemStream = new SystemStream("test-system", "test-stream")
- offsets += systemStream -> "1"
+ var offsets = Map[SystemStreamPartition, String]()
+ val systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(777))
+ offsets += systemStreamPartition -> "1"
val deserializedOffsets = serde.fromBytes(serde.toBytes(new Checkpoint(offsets)))
- assertEquals("1", deserializedOffsets.getOffsets.get(systemStream))
+ assertEquals("1", deserializedOffsets.getOffsets.get(systemStreamPartition))
assertEquals(1, deserializedOffsets.getOffsets.size)
}
@Test
- def testMoreThanOneOffsetShouldFail {
- val serde = new CheckpointSerde
- var offsets = Map[SystemStream, String]()
- // Since SS != SSP with same system and stream name, this should result in
- // two offsets for one system stream in the serde.
- offsets += new SystemStream("test-system", "test-stream") -> "1"
- offsets += new SystemStreamPartition("test-system", "test-stream", new Partition(0)) -> "2"
- try {
- serde.toBytes(new Checkpoint(offsets))
- fail("Expected to fail with more than one offset for a single SystemStream.")
- } catch {
- case e: SamzaException => // expected this
- }
+ def testChangelogPartitionMappingRoundTrip {
+ val mapping = new util.HashMap[TaskName, java.lang.Integer]()
+ mapping.put(new TaskName("Ted"), 0)
+ mapping.put(new TaskName("Dougal"), 1)
+ mapping.put(new TaskName("Jack"), 2)
+
+ val checkpointSerde = new CheckpointSerde
+ val asBytes = checkpointSerde.changelogPartitionMappingToBytes(mapping)
+ val backToMap = checkpointSerde.changelogPartitionMappingFromBytes(asBytes)
+
+ assertEquals(mapping, backToMap)
+ assertNotSame(mapping, backToMap)
}
-}
\ No newline at end of file
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
index d31c3ce..f505eb1 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
@@ -19,17 +19,17 @@
package org.apache.samza.system.filereader
-import org.junit.Test
-import org.junit.Assert._
-import org.apache.samza.system.SystemStreamPartition
-import org.junit.AfterClass
-import java.io.PrintWriter
import java.io.File
+import java.io.FileWriter
+import java.io.PrintWriter
import org.apache.samza.Partition
+import org.apache.samza.system.SystemStreamPartition
+import org.junit.AfterClass
+import org.junit.Assert._
+import org.junit.BeforeClass
+import org.junit.Test
import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
-import org.junit.BeforeClass
-import java.io.FileWriter
object TestFileReaderSystemConsumer {
val consumer = new FileReaderSystemConsumer("file-reader", null)
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
index 12f1e03..7cfeb5a 100644
--- a/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
@@ -21,13 +21,15 @@ package org.apache.samza.task
import org.junit.Assert._
import org.junit.Test
-import org.apache.samza.Partition
import org.apache.samza.task.TaskCoordinator.RequestScope
+import org.apache.samza.container.TaskName
class TestReadableCoordinator {
+ val taskName = new TaskName("P0")
+
@Test
def testCommitTask {
- val coord = new ReadableCoordinator(new Partition(0))
+ val coord = new ReadableCoordinator(taskName)
assertFalse(coord.requestedCommitTask)
assertFalse(coord.requestedCommitAll)
coord.commit(RequestScope.CURRENT_TASK)
@@ -37,7 +39,7 @@ class TestReadableCoordinator {
@Test
def testCommitAll {
- val coord = new ReadableCoordinator(new Partition(0))
+ val coord = new ReadableCoordinator(taskName)
assertFalse(coord.requestedCommitTask)
assertFalse(coord.requestedCommitAll)
coord.commit(RequestScope.ALL_TASKS_IN_CONTAINER)
@@ -47,7 +49,7 @@ class TestReadableCoordinator {
@Test
def testShutdownNow {
- val coord = new ReadableCoordinator(new Partition(0))
+ val coord = new ReadableCoordinator(taskName)
assertFalse(coord.requestedShutdownOnConsensus)
assertFalse(coord.requestedShutdownNow)
coord.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER)
@@ -57,7 +59,7 @@ class TestReadableCoordinator {
@Test
def testShutdownRequest {
- val coord = new ReadableCoordinator(new Partition(0))
+ val coord = new ReadableCoordinator(taskName)
assertFalse(coord.requestedShutdownOnConsensus)
assertFalse(coord.requestedShutdownNow)
coord.shutdown(RequestScope.CURRENT_TASK)
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
index ad6d2da..7c314ce 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -20,9 +20,13 @@ package org.apache.samza.util
import org.apache.samza.Partition
import org.apache.samza.config.Config
+import org.apache.samza.config.Config
import org.apache.samza.config.MapConfig
+import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions}
+import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.system.SystemFactory
+import org.apache.samza.system.SystemFactory
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.util.Util._
import org.junit.Assert._
@@ -54,48 +58,32 @@ class TestUtil {
}
@Test
- def testGetTopicPartitionsForTask() {
- def partitionSet(max:Int) = (0 until max).map(new Partition(_)).toSet
-
- val taskCount = 4
- val streamsMap = Map("kafka.a" -> partitionSet(4), "kafka.b" -> partitionSet(18), "timestream.c" -> partitionSet(24))
- val streamsAndParts = (for(s <- streamsMap.keys;
- part <- streamsMap.getOrElse(s, Set.empty))
- yield new SystemStreamPartition(getSystemStreamFromNames(s), part)).toSet
+ def testResolveTaskNameToChangelogPartitionMapping {
+ def testRunner(description:String, currentTaskNames:Set[TaskName], previousTaskNameMapping:Map[TaskName, Int],
+ result:Map[TaskName, Int]) {
+ assertEquals("Failed: " + description, result,
+ Util.resolveTaskNameToChangelogPartitionMapping(currentTaskNames, previousTaskNameMapping))
+ }
- for(i <- 0 until taskCount) {
- val result: Set[SystemStreamPartition] = Util.getStreamsAndPartitionsForContainer(i, taskCount, streamsAndParts)
- // b -> 18 % 4 = 2 therefore first two results should have an extra element
- if(i < 2) {
- assertEquals(12, result.size)
- } else {
- assertEquals(11, result.size)
- }
+ testRunner("No change between runs",
+ Set(new TaskName("Partition 0")),
+ Map(new TaskName("Partition 0") -> 0),
+ Map(new TaskName("Partition 0") -> 0))
- result.foreach(r => assertEquals(i, r.getPartition.getPartitionId % taskCount))
- }
- }
-
- @Test
- def testJsonCreateStreamPartitionStringRoundTrip() {
- val getPartitions: Set[SystemStreamPartition] = {
- // Build a heavily skewed set of partitions.
- def partitionSet(max:Int) = (0 until max).map(new Partition(_)).toSet
- val system = "all-same-system."
- val lotsOfParts = Map(system + "topic-with-many-parts-a" -> partitionSet(128),
- system + "topic-with-many-parts-b" -> partitionSet(128), system + "topic-with-many-parts-c" -> partitionSet(64))
- val fewParts = ('c' to 'z').map(l => system + l.toString -> partitionSet(4)).toMap
- val streamsMap = (lotsOfParts ++ fewParts)
- (for(s <- streamsMap.keys;
- part <- streamsMap.getOrElse(s, Set.empty)) yield new SystemStreamPartition(getSystemStreamFromNames(s), part)).toSet
- }
+ testRunner("New TaskName added, none missing this run",
+ Set(new TaskName("Partition 0"), new TaskName("Partition 1")),
+ Map(new TaskName("Partition 0") -> 0),
+ Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1))
- val streamsAndParts: Set[SystemStreamPartition] = getStreamsAndPartitionsForContainer(0, 4, getPartitions).toSet
- println(streamsAndParts)
- val asString = serializeSSPSetToJSON(streamsAndParts)
+ testRunner("New TaskName added, one missing this run",
+ Set(new TaskName("Partition 0"), new TaskName("Partition 2")),
+ Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1),
+ Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1, new TaskName("Partition 2") -> 2))
- val backToStreamsAndParts = deserializeSSPSetFromJSON(asString)
- assertEquals(streamsAndParts, backToStreamsAndParts)
+ testRunner("New TaskName added, all previous missing this run",
+ Set(new TaskName("Partition 3"), new TaskName("Partition 4")),
+ Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1, new TaskName("Partition 2") -> 2),
+ Map(new TaskName("Partition 0") -> 0, new TaskName("Partition 1") -> 1, new TaskName("Partition 2") -> 2, new TaskName("Partition 3") -> 3, new TaskName("Partition 4") -> 4))
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
new file mode 100644
index 0000000..5d8ee4f
--- /dev/null
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.checkpoint.kafka
+
+import java.util
+import org.apache.samza.SamzaException
+import org.apache.samza.container.TaskName
+import org.codehaus.jackson.`type`.TypeReference
+import org.codehaus.jackson.map.ObjectMapper
+import scala.collection.JavaConversions._
+
+/**
+ * Kafka Checkpoint Log-specific key used to identify what type of entry is
+ * written for any particular log entry.
+ *
+ * @param map Backing map to hold key values
+ */
+class KafkaCheckpointLogKey private (val map: Map[String, String]) {
+ // This might be better as a case class...
+ import KafkaCheckpointLogKey._
+
+ /**
+ * Serialize this key to bytes
+ * @return Key as bytes
+ */
+ def toBytes(): Array[Byte] = {
+ val jMap = new util.HashMap[String, String](map.size)
+ jMap.putAll(map)
+
+ JSON_MAPPER.writeValueAsBytes(jMap)
+ }
+
+ private def getKey = map.getOrElse(CHECKPOINT_KEY_KEY, throw new SamzaException("No " + CHECKPOINT_KEY_KEY + " in map for Kafka Checkpoint log key"))
+
+ /**
+ * Is this key for a checkpoint entry?
+ *
+ * @return true iff this key's entry is for a checkpoint
+ */
+ def isCheckpointKey = getKey.equals(CHECKPOINT_KEY_TYPE)
+
+ /**
+ * Is this key for a changelog partition mapping?
+ *
+ * @return true iff this key's entry is for a changelog partition mapping
+ */
+ def isChangelogPartitionMapping = getKey.equals(CHANGELOG_PARTITION_KEY_TYPE)
+
+ /**
+ * If this Key is for a checkpoint entry, return its associated TaskName.
+ *
+ * @return TaskName for this checkpoint or throw an exception if this key does not have a TaskName entry
+ */
+ def getCheckpointTaskName = {
+ val asString = map.getOrElse(CHECKPOINT_TASKNAME_KEY, throw new SamzaException("No TaskName in checkpoint key: " + this))
+ new TaskName(asString)
+ }
+
+ def canEqual(other: Any): Boolean = other.isInstanceOf[KafkaCheckpointLogKey]
+
+ override def equals(other: Any): Boolean = other match {
+ case that: KafkaCheckpointLogKey =>
+ (that canEqual this) &&
+ map == that.map
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ val state = Seq(map)
+ state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
+ }
+}
+
+object KafkaCheckpointLogKey {
+ /**
+ * Messages in the checkpoint log have keys associated with them. These keys are maps that describe the message's
+ * type, either a checkpoint or a changelog-partition-mapping.
+ */
+ val CHECKPOINT_KEY_KEY = "type"
+ val CHECKPOINT_KEY_TYPE = "checkpoint"
+ val CHANGELOG_PARTITION_KEY_TYPE = "changelog-partition-mapping"
+ val CHECKPOINT_TASKNAME_KEY = "taskName"
+ val SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY = "systemstreampartition-grouper-factory"
+
+ /**
+ * Partition mapping keys have no dynamic values, so we just need one instance.
+ */
+ val CHANGELOG_PARTITION_MAPPING_KEY = new KafkaCheckpointLogKey(Map(CHECKPOINT_KEY_KEY -> CHANGELOG_PARTITION_KEY_TYPE))
+
+ private val JSON_MAPPER = new ObjectMapper()
+ val KEY_TYPEREFERENCE = new TypeReference[util.HashMap[String, String]]() {}
+
+ var systemStreamPartitionGrouperFactoryString:Option[String] = None
+
+ /**
+ * Set the name of the factory configured to provide the SystemStreamPartition grouping
+ * so it be included in the key.
+ *
+ * @param str Config value of SystemStreamPartition Grouper Factory
+ */
+ def setSystemStreamPartitionGrouperFactoryString(str:String) = {
+ systemStreamPartitionGrouperFactoryString = Some(str)
+ }
+
+ /**
+ * Get the name of the factory configured to provide the SystemStreamPartition grouping
+ * so it be included in the key
+ */
+ def getSystemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString.getOrElse(throw new SamzaException("No SystemStreamPartition grouping factory string has been set."))
+
+ /**
+ * Build a key for a a checkpoint log entry for a particular TaskName
+ * @param taskName TaskName to build for this checkpoint entry
+ *
+ * @return Key for checkpoint log entry
+ */
+ def getCheckpointKey(taskName:TaskName) = {
+ val map = Map(CHECKPOINT_KEY_KEY -> CHECKPOINT_KEY_TYPE,
+ CHECKPOINT_TASKNAME_KEY -> taskName.getTaskName,
+ SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY -> getSystemStreamPartitionGrouperFactoryString)
+
+ new KafkaCheckpointLogKey(map)
+ }
+
+ /**
+ * Build a key for a changelog partition mapping entry
+ *
+ * @return Key for changelog partition mapping entry
+ */
+ def getChangelogPartitionMappingKey() = CHANGELOG_PARTITION_MAPPING_KEY
+
+ /**
+ * Deserialize a Kafka checkpoint log key
+ * @param bytes Serialized (via JSON) Kafka checkpoint log key
+ * @return Checkpoint log key
+ */
+ def fromBytes(bytes: Array[Byte]): KafkaCheckpointLogKey = {
+ try {
+ val jmap: util.HashMap[String, String] = JSON_MAPPER.readValue(bytes, KEY_TYPEREFERENCE)
+
+ if(!jmap.containsKey(CHECKPOINT_KEY_KEY)) {
+ throw new SamzaException("No type entry in checkpoint key: " + jmap)
+ }
+
+ // Only checkpoint keys have ssp grouper factory keys
+ if(jmap.get(CHECKPOINT_KEY_KEY).equals(CHECKPOINT_KEY_TYPE)) {
+ val sspGrouperFactory = jmap.get(SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY)
+
+ if (sspGrouperFactory == null) {
+ throw new SamzaException("No SystemStreamPartition Grouper factory entry in checkpoint key: " + jmap)
+ }
+
+ if (!sspGrouperFactory.equals(getSystemStreamPartitionGrouperFactoryString)) {
+ throw new DifferingSystemStreamPartitionGrouperFactoryValues(sspGrouperFactory, getSystemStreamPartitionGrouperFactoryString)
+ }
+ }
+
+ new KafkaCheckpointLogKey(jmap.toMap)
+ } catch {
+ case e: Exception =>
+ throw new SamzaException("Exception while deserializing checkpoint key", e)
+ }
+ }
+}
+
+class DifferingSystemStreamPartitionGrouperFactoryValues(inKey:String, inConfig:String) extends SamzaException {
+ override def getMessage() = "Checkpoint key's SystemStreamPartition Grouper factory (" + inKey +
+ ") does not match value from current configuration (" + inConfig + "). " +
+ "This likely means the SystemStreamPartitionGrouper was changed between job runs, which is not supported."
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 15245d4..fff62e4 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -19,76 +19,112 @@
package org.apache.samza.checkpoint.kafka
-import org.I0Itec.zkclient.ZkClient
import grizzled.slf4j.Logging
+import java.nio.ByteBuffer
+import java.util
import kafka.admin.AdminUtils
-import kafka.api.FetchRequestBuilder
-import kafka.api.OffsetRequest
-import kafka.api.PartitionOffsetRequestInfo
+import kafka.api._
import kafka.common.ErrorMapping
+import kafka.common.InvalidMessageSizeException
import kafka.common.TopicAndPartition
import kafka.common.TopicExistsException
-import kafka.common.InvalidMessageSizeException
import kafka.common.UnknownTopicOrPartitionException
import kafka.consumer.SimpleConsumer
+import kafka.message.InvalidMessageException
import kafka.producer.KeyedMessage
-import kafka.producer.Partitioner
import kafka.producer.Producer
-import kafka.serializer.Decoder
-import kafka.serializer.Encoder
import kafka.utils.Utils
-import kafka.utils.VerifiableProperties
-import kafka.message.InvalidMessageException
-import org.apache.samza.Partition
+import org.I0Itec.zkclient.ZkClient
import org.apache.samza.SamzaException
import org.apache.samza.checkpoint.Checkpoint
import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.container.TaskName
import org.apache.samza.serializers.CheckpointSerde
-import org.apache.samza.serializers.Serde
import org.apache.samza.system.kafka.TopicMetadataCache
-import org.apache.samza.util.TopicMetadataStore
import org.apache.samza.util.ExponentialSleepStrategy
+import org.apache.samza.util.TopicMetadataStore
+import scala.collection.mutable
/**
- * Kafka checkpoint manager is used to store checkpoints in a Kafka topic that
- * is uniquely identified by a job/partition combination. To read a checkpoint
- * for a given job and partition combination (e.g. my-job, partition 1), we
- * simply read the last message from the topic: __samza_checkpoint_my-job_1. If
- * the topic does not yet exist, we assume that there is not yet any state for
- * this job/partition pair, and return an empty checkpoint.
+ * Kafka checkpoint manager is used to store checkpoints in a Kafka topic.
+ * To read a checkpoint for a specific taskName, we find the newest message
+ * keyed to that taskName. If there is no such message, no checkpoint data
+ * exists. The underlying log has a single partition into which all
+ * checkpoints and TaskName to changelog partition mappings are written.
*/
class KafkaCheckpointManager(
clientId: String,
checkpointTopic: String,
systemName: String,
- totalPartitions: Int,
replicationFactor: Int,
socketTimeout: Int,
bufferSize: Int,
fetchSize: Int,
metadataStore: TopicMetadataStore,
- connectProducer: () => Producer[Partition, Array[Byte]],
+ connectProducer: () => Producer[Array[Byte], Array[Byte]],
connectZk: () => ZkClient,
+ systemStreamPartitionGrouperFactoryString: String,
retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
- serde: Serde[Checkpoint] = new CheckpointSerde) extends CheckpointManager with Logging {
+ serde: CheckpointSerde = new CheckpointSerde) extends CheckpointManager with Logging {
+ import KafkaCheckpointManager._
+
+ var taskNames = Set[TaskName]()
+ var producer: Producer[Array[Byte], Array[Byte]] = null
+ var taskNamesToOffsets: Map[TaskName, Checkpoint] = null
- var partitions = Set[Partition]()
- var producer: Producer[Partition, Array[Byte]] = null
+ var startingOffset: Option[Long] = None // Where to start reading for each subsequent call of readCheckpoint
- info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format (clientId, checkpointTopic, systemName))
+ KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString(systemStreamPartitionGrouperFactoryString)
- def writeCheckpoint(partition: Partition, checkpoint: Checkpoint) {
+ info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format(clientId, checkpointTopic, systemName))
+
+ /**
+ * Write Checkpoint for specified taskName to log
+ *
+ * @param taskName Specific Samza taskName of which to write a checkpoint of.
+ * @param checkpoint Reference to a Checkpoint object to store offset data in.
+ **/
+ override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {
+ val key = KafkaCheckpointLogKey.getCheckpointKey(taskName)
+ val keyBytes = key.toBytes()
+ val msgBytes = serde.toBytes(checkpoint)
+
+ writeLog(CHECKPOINT_LOG4J_ENTRY, keyBytes, msgBytes)
+ }
+
+ /**
+ * Write the taskName to partition mapping that is being maintained by this CheckpointManager
+ *
+ * @param changelogPartitionMapping Each TaskName's partition within the changelog
+ */
+ override def writeChangeLogPartitionMapping(changelogPartitionMapping: util.Map[TaskName, java.lang.Integer]) {
+ val key = KafkaCheckpointLogKey.getChangelogPartitionMappingKey()
+ val keyBytes = key.toBytes()
+ val msgBytes = serde.changelogPartitionMappingToBytes(changelogPartitionMapping)
+
+ writeLog(CHANGELOG_PARTITION_MAPPING_LOG4j, keyBytes, msgBytes)
+ }
+
+ /**
+ * Common code for writing either checkpoints or changelog-partition-mappings to the log
+ *
+ * @param logType Type of entry that is being written, for logging
+ * @param key pre-serialized key for message
+ * @param msg pre-serialized message to write to log
+ */
+ private def writeLog(logType:String, key: Array[Byte], msg: Array[Byte]) {
retryBackoff.run(
loop => {
if (producer == null) {
producer = connectProducer()
}
- producer.send(new KeyedMessage(checkpointTopic, null, partition, serde.toBytes(checkpoint)))
+
+ producer.send(new KeyedMessage(checkpointTopic, key, 0, msg))
loop.done
},
(exception, loop) => {
- warn("Failed to send checkpoint %s for partition %s: %s. Retrying." format (checkpoint, partition, exception))
+ warn("Failed to write %s partition entry %s: %s. Retrying." format(logType, key, exception))
debug("Exception detail:", exception)
if (producer != null) {
producer.close
@@ -98,124 +134,219 @@ class KafkaCheckpointManager(
)
}
- def readLastCheckpoint(partition: Partition): Checkpoint = {
- info("Reading checkpoint for partition %s." format partition.getPartitionId)
+ private def getConsumer(): SimpleConsumer = {
+ val metadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
+ val metadata = metadataMap(checkpointTopic)
+ val partitionMetadata = metadata.partitionsMetadata
+ .filter(_.partitionId == 0)
+ .headOption
+ .getOrElse(throw new KafkaCheckpointException("Tried to find partition information for partition 0 for checkpoint topic, but it didn't exist in Kafka."))
+ val leader = partitionMetadata
+ .leader
+ .getOrElse(throw new SamzaException("No leader available for topic %s" format checkpointTopic))
+
+ info("Connecting to leader %s:%d for topic %s and to fetch all checkpoint messages." format(leader.host, leader.port, checkpointTopic))
+
+ new SimpleConsumer(leader.host, leader.port, socketTimeout, bufferSize, clientId)
+ }
+
+ private def getEarliestOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition): Long = consumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, -1)
+
+ private def getOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition, earliestOrLatest: Long): Long = {
+ val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1))))
+ .partitionErrorAndOffsets
+ .get(topicAndPartition)
+ .getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:0" format checkpointTopic))
+ // Fail or retry if there was an an issue with the offset request.
+ ErrorMapping.maybeThrowException(offsetResponse.error)
+
+ val offset: Long = offsetResponse
+ .offsets
+ .headOption
+ .getOrElse(throw new KafkaCheckpointException("Got response, but no offsets defined for %s:0" format checkpointTopic))
+
+ offset
+ }
+
+ /**
+ * Read the last checkpoint for specified TaskName
+ *
+ * @param taskName Specific Samza taskName for which to get the last checkpoint of.
+ **/
+ override def readLastCheckpoint(taskName: TaskName): Checkpoint = {
+ if (!taskNames.contains(taskName)) {
+ throw new SamzaException(taskName + " not registered with this CheckpointManager")
+ }
+
+ info("Reading checkpoint for taskName " + taskName)
+
+ if (taskNamesToOffsets == null) {
+ info("No TaskName to checkpoint mapping provided. Reading for first time.")
+ taskNamesToOffsets = readCheckpointsFromLog()
+ } else {
+ info("Already existing checkpoint mapping. Merging new offsets")
+ taskNamesToOffsets ++= readCheckpointsFromLog()
+ }
+
+ val checkpoint = taskNamesToOffsets.get(taskName).getOrElse(null)
+
+ info("Got checkpoint state for taskName %s: %s" format(taskName, checkpoint))
+
+ checkpoint
+ }
+
+ /**
+ * Read through entire log, discarding changelog mapping, and building map of TaskNames to Checkpoints
+ */
+ private def readCheckpointsFromLog(): Map[TaskName, Checkpoint] = {
+ val checkpoints = mutable.Map[TaskName, Checkpoint]()
+
+ def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isCheckpointKey
+
+ def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = {
+ val taskName = checkpointKey.getCheckpointTaskName
+
+ if (taskNames.contains(taskName)) {
+ val checkpoint = serde.fromBytes(Utils.readBytes(payload))
+
+ debug("Adding checkpoint " + checkpoint + " for taskName " + taskName)
+
+ checkpoints.put(taskName, checkpoint) // replacing any existing, older checkpoints as we go
+ }
+ }
+
+ readLog(CHECKPOINT_LOG4J_ENTRY, shouldHandleEntry, handleCheckpoint)
+
+ checkpoints.toMap /* of the immutable kind */
+ }
+
+ /**
+ * Read through entire log, discarding checkpoints, finding latest changelogPartitionMapping
+ *
+ * Lots of duplicated code from the checkpoint method, but will be better to refactor this code into AM-based
+ * checkpoint log reading
+ */
+ override def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = {
+ var changelogPartitionMapping: util.Map[TaskName, java.lang.Integer] = new util.HashMap[TaskName, java.lang.Integer]()
+
+ def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isChangelogPartitionMapping
+
+ def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = {
+ changelogPartitionMapping = serde.changelogPartitionMappingFromBytes(Utils.readBytes(payload))
+
+ debug("Adding changelog partition mapping" + changelogPartitionMapping)
+ }
- val checkpoint = retryBackoff.run(
+ readLog(CHANGELOG_PARTITION_MAPPING_LOG4j, shouldHandleEntry, handleCheckpoint)
+
+ changelogPartitionMapping
+ }
+
+ /**
+ * Common code for reading both changelog partition mapping and change log
+ *
+ * @param entryType What type of entry to look for within the log key's
+ * @param handleEntry Code to handle an entry in the log once it's found
+ */
+ private def readLog(entryType:String, shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean,
+ handleEntry: (ByteBuffer, KafkaCheckpointLogKey) => Unit): Unit = {
+ retryBackoff.run[Unit](
loop => {
- // Assume checkpoint topic exists with correct partitions, since it should be verified on start.
- // Fetch the metadata for this checkpoint topic/partition pair.
- val metadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
- val metadata = metadataMap(checkpointTopic)
- val partitionMetadata = metadata.partitionsMetadata
- .filter(_.partitionId == partition.getPartitionId)
- .headOption
- .getOrElse(throw new KafkaCheckpointException("Tried to find partition information for partition %d, but it didn't exist in Kafka." format partition.getPartitionId))
- val partitionId = partitionMetadata.partitionId
- val leader = partitionMetadata
- .leader
- .getOrElse(throw new SamzaException("No leader available for topic %s" format checkpointTopic))
-
- info("Connecting to leader %s:%d for topic %s and partition %s to fetch last checkpoint message." format (leader.host, leader.port, checkpointTopic, partitionId))
-
- val consumer = new SimpleConsumer(
- leader.host,
- leader.port,
- socketTimeout,
- bufferSize,
- clientId)
+ val consumer = getConsumer()
+
+ val topicAndPartition = new TopicAndPartition(checkpointTopic, 0)
+
try {
- val topicAndPartition = new TopicAndPartition(checkpointTopic, partitionId)
- val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))))
- .partitionErrorAndOffsets
- .get(topicAndPartition)
- .getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:%d" format (checkpointTopic, partitionId)))
+ var offset = startingOffset.getOrElse(getEarliestOffset(consumer, topicAndPartition))
- // Fail or retry if there was an an issue with the offset request.
- ErrorMapping.maybeThrowException(offsetResponse.error)
+ info("Got offset %s for topic %s and partition 0. Attempting to fetch messages for %s." format(offset, checkpointTopic, entryType))
- val offset = offsetResponse
- .offsets
- .headOption
- .getOrElse(throw new KafkaCheckpointException("Got response, but no offsets defined for %s:%d" format (checkpointTopic, partitionId)))
+ val latestOffset = getOffset(consumer, topicAndPartition, OffsetRequest.LatestTime)
- info("Got offset %s for topic %s and partition %s. Attempting to fetch message." format (offset, checkpointTopic, partitionId))
+ info("Get latest offset %s for topic %s and partition 0." format(latestOffset, checkpointTopic))
- if (offset <= 0) {
- info("Got offset 0 (no messages in checkpoint topic) for topic %s and partition %s, so returning null. If you expected the checkpoint topic to have messages, you're probably going to lose data." format (checkpointTopic, partition))
- return null
+ if (offset < 0) {
+ info("Got offset 0 (no messages in %s) for topic %s and partition 0, so returning empty collection. If you expected the checkpoint topic to have messages, you're probably going to lose data." format (entryType, checkpointTopic))
+ return
}
- val request = new FetchRequestBuilder()
- // Kafka returns 1 greater than the offset of the last message in
- // the topic, so subtract one to fetch the last message.
- .addFetch(checkpointTopic, partitionId, offset - 1, fetchSize)
- .maxWait(500)
- .minBytes(1)
- .clientId(clientId)
- .build
- val messageSet = consumer.fetch(request)
- if (messageSet.hasError) {
- warn("Got error code from broker for %s: %s" format (checkpointTopic, messageSet.errorCode(checkpointTopic, partitionId)))
- val errorCode = messageSet.errorCode(checkpointTopic, partitionId)
- if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) {
- warn("Got an offset out of range exception while getting last checkpoint for topic %s and partition %s, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (checkpointTopic, partitionId))
- return null
+ while (offset < latestOffset) {
+ val request = new FetchRequestBuilder()
+ .addFetch(checkpointTopic, 0, offset, fetchSize)
+ .maxWait(500)
+ .minBytes(1)
+ .clientId(clientId)
+ .build
+
+ val fetchResponse = consumer.fetch(request)
+ if (fetchResponse.hasError) {
+ warn("Got error code from broker for %s: %s" format(checkpointTopic, fetchResponse.errorCode(checkpointTopic, 0)))
+ val errorCode = fetchResponse.errorCode(checkpointTopic, 0)
+ if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) {
+ warn("Got an offset out of range exception while getting last entry in %s for topic %s and partition 0, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (entryType, checkpointTopic))
+ return
+ }
+ ErrorMapping.maybeThrowException(errorCode)
}
- ErrorMapping.maybeThrowException(errorCode)
- }
- val messages = messageSet.messageSet(checkpointTopic, partitionId).toList
- if (messages.length != 1) {
- throw new KafkaCheckpointException("Something really unexpected happened. Got %s "
- + "messages back when fetching from checkpoint topic %s and partition %s. "
- + "Expected one message. It would be unsafe to go on without the latest checkpoint, "
- + "so failing." format (messages.length, checkpointTopic, partition))
- }
+ for (response <- fetchResponse.messageSet(checkpointTopic, 0)) {
+ offset = response.nextOffset
+ startingOffset = Some(offset) // For next time we call
+
+ if (!response.message.hasKey) {
+ throw new KafkaCheckpointException("Encountered message without key.")
+ }
- // Some back bending to go from message to checkpoint.
- val checkpoint = serde.fromBytes(Utils.readBytes(messages(0).message.payload))
- loop.done
- checkpoint
+ val checkpointKey = KafkaCheckpointLogKey.fromBytes(Utils.readBytes(response.message.key))
+
+ if (!shouldHandleEntry(checkpointKey)) {
+ debug("Skipping " + entryType + " entry with key " + checkpointKey)
+ } else {
+ handleEntry(response.message.payload, checkpointKey)
+ }
+ }
+ }
} finally {
- consumer.close
+ consumer.close()
}
+
+ loop.done
+ Unit
},
(exception, loop) => {
exception match {
- case e: InvalidMessageException => throw new KafkaCheckpointException ("Got InvalidMessageException from Kafka, which is unrecoverable, so fail the samza job", e)
- case e: InvalidMessageSizeException => throw new KafkaCheckpointException ("Got InvalidMessageSizeException from Kafka, which is unrecoverable, so fail the samza job", e)
- case e: UnknownTopicOrPartitionException => throw new KafkaCheckpointException ("Got UnknownTopicOrPartitionException from Kafka, which is unrecoverable, so fail the samza job", e)
+ case e: InvalidMessageException => throw new KafkaCheckpointException("Got InvalidMessageException from Kafka, which is unrecoverable, so fail the samza job", e)
+ case e: InvalidMessageSizeException => throw new KafkaCheckpointException("Got InvalidMessageSizeException from Kafka, which is unrecoverable, so fail the samza job", e)
+ case e: UnknownTopicOrPartitionException => throw new KafkaCheckpointException("Got UnknownTopicOrPartitionException from Kafka, which is unrecoverable, so fail the samza job", e)
case e: KafkaCheckpointException => throw e
case e: Exception =>
- warn("While trying to read last checkpoint for topic %s and partition %s: %s. Retrying." format (checkpointTopic, partition, e))
+ warn("While trying to read last %s entry for topic %s and partition 0: %s. Retrying." format(entryType, checkpointTopic, e))
debug("Exception detail:", e)
}
}
- ).getOrElse(throw new SamzaException("Failed to get checkpoint for partition %s" format partition.getPartitionId))
+ ).getOrElse(throw new SamzaException("Failed to get entries for " + entryType + " from topic " + checkpointTopic))
- info("Got checkpoint state for partition %s: %s" format (partition.getPartitionId, checkpoint))
- checkpoint
}
def start {
- createTopic
+ create
validateTopic
}
- def register(partition: Partition) {
- partitions += partition
+ def register(taskName: TaskName) {
+ debug("Adding taskName " + taskName + " to " + this)
+ taskNames += taskName
}
def stop = {
- if(producer != null) {
+ if (producer != null) {
producer.close
}
}
- private def createTopic {
- info("Attempting to create checkpoint topic %s with %s partitions." format (checkpointTopic, totalPartitions))
+ def create {
+ info("Attempting to create checkpoint topic %s." format checkpointTopic)
retryBackoff.run(
loop => {
val zkClient = connectZk()
@@ -223,7 +354,7 @@ class KafkaCheckpointManager(
AdminUtils.createTopic(
zkClient,
checkpointTopic,
- totalPartitions,
+ 1,
replicationFactor)
} finally {
zkClient.close
@@ -239,7 +370,7 @@ class KafkaCheckpointManager(
info("Checkpoint topic %s already exists." format checkpointTopic)
loop.done
case e: Exception =>
- warn("Failed to create topic %s: %s. Retrying." format (checkpointTopic, e))
+ warn("Failed to create topic %s: %s. Retrying." format(checkpointTopic, e))
debug("Exception detail:", e)
}
}
@@ -255,8 +386,8 @@ class KafkaCheckpointManager(
ErrorMapping.maybeThrowException(topicMetadata.errorCode)
val partitionCount = topicMetadata.partitionsMetadata.length
- if (partitionCount != totalPartitions) {
- throw new KafkaCheckpointException("Checkpoint topic validation failed for topic %s because partition count %s did not match expected partition count %s." format (checkpointTopic, topicMetadata.partitionsMetadata.length, totalPartitions))
+ if (partitionCount != 1) {
+ throw new KafkaCheckpointException("Checkpoint topic validation failed for topic %s because partition count %s did not match expected partition count of 1." format(checkpointTopic, topicMetadata.partitionsMetadata.length))
}
info("Successfully validated checkpoint topic %s." format checkpointTopic)
@@ -267,14 +398,19 @@ class KafkaCheckpointManager(
exception match {
case e: KafkaCheckpointException => throw e
case e: Exception =>
- warn("While trying to validate topic %s: %s. Retrying." format (checkpointTopic, e))
+ warn("While trying to validate topic %s: %s. Retrying." format(checkpointTopic, e))
debug("Exception detail:", e)
}
}
)
}
- override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format (systemName, checkpointTopic)
+ override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format(systemName, checkpointTopic)
+}
+
+object KafkaCheckpointManager {
+ val CHECKPOINT_LOG4J_ENTRY = "checkpoint log"
+ val CHANGELOG_PARTITION_MAPPING_LOG4j = "changelog partition mapping"
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index cb6dbdf..087c6ad 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -19,26 +19,28 @@
package org.apache.samza.checkpoint.kafka
-import org.apache.samza.config.{ KafkaConfig, Config }
-import org.apache.samza.SamzaException
-import java.util.Properties
+import grizzled.slf4j.Logging
import kafka.producer.Producer
-import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.config.StreamConfig.Config2Stream
-import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.KafkaConfig.Config2Kafka
+import kafka.utils.ZKStringSerializer
+import org.I0Itec.zkclient.ZkClient
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.checkpoint.CheckpointManagerFactory
+import org.apache.samza.config.Config
import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.Partition
-import grizzled.slf4j.Logging
+import org.apache.samza.config.KafkaConfig.Config2Kafka
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.util.{ KafkaUtil, ClientUtilTopicMetadataStore }
-import org.apache.samza.util.Util
-import org.I0Itec.zkclient.ZkClient
-import kafka.utils.ZKStringSerializer
-import org.apache.samza.checkpoint.CheckpointManagerFactory
-import org.apache.samza.checkpoint.CheckpointManager
+object KafkaCheckpointManagerFactory {
+ /**
+ * Version number to track the format of the checkpoint log
+ */
+ val CHECKPOINT_LOG_VERSION_NUMBER = 1
+}
class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging {
+ import KafkaCheckpointManagerFactory._
+
def getCheckpointManager(config: Config, registry: MetricsRegistry): CheckpointManager = {
val clientId = KafkaUtil.getClientId("samza-checkpoint-manager", config)
val systemName = config
@@ -60,7 +62,7 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
val fetchSize = consumerConfig.fetchMessageMaxBytes // must be > buffer size
val connectProducer = () => {
- new Producer[Partition, Array[Byte]](producerConfig)
+ new Producer[Array[Byte], Array[Byte]](producerConfig)
}
val zkConnect = Option(consumerConfig.zkConnect)
.getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
@@ -73,24 +75,24 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
.getOrElse(throw new SamzaException("No broker list defined in config for %s." format systemName))
val metadataStore = new ClientUtilTopicMetadataStore(brokersListString, clientId, socketTimeout)
val checkpointTopic = getTopic(jobName, jobId)
-
- // This is a reasonably expensive operation and the TaskInstance already knows the answer. Should use that info.
- val totalPartitions = Util.getInputStreamPartitions(config).map(_.getPartition).toSet.size
+
+ // Find out the SSPGrouperFactory class so it can be included/verified in the key
+ val systemStreamPartitionGrouperFactoryString = config.getSystemStreamPartitionGrouperFactory
new KafkaCheckpointManager(
clientId,
checkpointTopic,
systemName,
- totalPartitions,
replicationFactor,
socketTimeout,
bufferSize,
fetchSize,
metadataStore,
connectProducer,
- connectZk)
+ connectZk,
+ systemStreamPartitionGrouperFactoryString)
}
private def getTopic(jobName: String, jobId: String) =
- "__samza_checkpoint_%s_%s" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
+ "__samza_checkpoint_ver_%d_for_%s_%s" format (CHECKPOINT_LOG_VERSION_NUMBER, jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
index 8a8834f..9553050 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
@@ -19,8 +19,6 @@
package org.apache.samza.system.kafka
-import scala.annotation.implicitNotFound
-
import grizzled.slf4j.Logging
import kafka.api.TopicMetadata
import kafka.common.ErrorMapping
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
new file mode 100644
index 0000000..7a23041
--- /dev/null
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.checkpoint.kafka
+
+import org.apache.samza.container.TaskName
+import org.junit.Assert._
+import org.junit.{Before, Test}
+import org.apache.samza.SamzaException
+
+class TestKafkaCheckpointLogKey {
+ @Before
+ def setSSPGrouperFactoryString() {
+ KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString("hello")
+ }
+
+ @Test
+ def checkpointKeySerializationRoundTrip() {
+ val checkpointKey = KafkaCheckpointLogKey.getCheckpointKey(new TaskName("TN"))
+ val asBytes = checkpointKey.toBytes()
+ val backFromBytes = KafkaCheckpointLogKey.fromBytes(asBytes)
+
+ assertEquals(checkpointKey, backFromBytes)
+ assertNotSame(checkpointKey, backFromBytes)
+ }
+
+ @Test
+ def changelogPartitionMappingKeySerializationRoundTrip() {
+ val key = KafkaCheckpointLogKey.getChangelogPartitionMappingKey()
+ val asBytes = key.toBytes()
+ val backFromBytes = KafkaCheckpointLogKey.fromBytes(asBytes)
+
+ assertEquals(key, backFromBytes)
+ assertNotSame(key, backFromBytes)
+ }
+
+ @Test
+ def differingSSPGrouperFactoriesCauseException() {
+
+ val checkpointKey = KafkaCheckpointLogKey.getCheckpointKey(new TaskName("TN"))
+
+ val asBytes = checkpointKey.toBytes()
+
+ KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString("goodbye")
+
+ var gotException = false
+ try {
+ KafkaCheckpointLogKey.fromBytes(asBytes)
+ } catch {
+ case se:SamzaException => assertEquals(new DifferingSystemStreamPartitionGrouperFactoryValues("hello", "goodbye").getMessage(), se.getCause.getMessage)
+ gotException = true
+ }
+
+ assertTrue("Should have had an exception since ssp grouper factories didn't match", gotException)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 92ac61e..cddee13 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -19,11 +19,9 @@
package org.apache.samza.checkpoint.kafka
-import org.I0Itec.zkclient.ZkClient
-import org.junit.Assert._
-import org.junit.AfterClass
-import org.junit.BeforeClass
-import org.junit.Test
+import kafka.common.InvalidMessageSizeException
+import kafka.common.UnknownTopicOrPartitionException
+import kafka.message.InvalidMessageException
import kafka.producer.Producer
import kafka.producer.ProducerConfig
import kafka.server.KafkaConfig
@@ -31,20 +29,20 @@ import kafka.server.KafkaServer
import kafka.utils.TestUtils
import kafka.utils.TestZKUtils
import kafka.utils.Utils
+import kafka.utils.ZKStringSerializer
import kafka.zk.EmbeddedZookeeper
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.Partition
-import scala.collection._
-import scala.collection.JavaConversions._
-import org.apache.samza.util.{ ClientUtilTopicMetadataStore, TopicMetadataStore }
-import org.apache.samza.config.MapConfig
+import org.I0Itec.zkclient.ZkClient
import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.container.TaskName
import org.apache.samza.serializers.CheckpointSerde
-import org.apache.samza.system.SystemStream
-import kafka.utils.ZKStringSerializer
-import kafka.message.InvalidMessageException
-import kafka.common.InvalidMessageSizeException
-import kafka.common.UnknownTopicOrPartitionException
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.util.{ ClientUtilTopicMetadataStore, TopicMetadataStore }
+import org.apache.samza.{SamzaException, Partition}
+import org.junit.Assert._
+import org.junit.{AfterClass, BeforeClass, Test}
+import scala.collection.JavaConversions._
+import scala.collection._
+import org.apache.samza.container.systemstreampartition.groupers.GroupByPartitionFactory
object TestKafkaCheckpointManager {
val zkConnect: String = TestZKUtils.zookeeperConnect
@@ -72,14 +70,16 @@ object TestKafkaCheckpointManager {
config.put("request.required.acks", "-1")
val producerConfig = new ProducerConfig(config)
val partition = new Partition(0)
- val cp1 = new Checkpoint(Map(new SystemStream("kafka", "topic") -> "123"))
- val cp2 = new Checkpoint(Map(new SystemStream("kafka", "topic") -> "12345"))
+ val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "123"))
+ val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "12345"))
var zookeeper: EmbeddedZookeeper = null
var server1: KafkaServer = null
var server2: KafkaServer = null
var server3: KafkaServer = null
var metadataStore: TopicMetadataStore = null
+ val systemStreamPartitionGrouperFactoryString = classOf[GroupByPartitionFactory].getCanonicalName
+
@BeforeClass
def beforeSetupServers {
zookeeper = new EmbeddedZookeeper(zkConnect)
@@ -108,42 +108,45 @@ class TestKafkaCheckpointManager {
import TestKafkaCheckpointManager._
@Test
- def testCheckpointShouldBeNullIfcheckpointTopicDoesNotExistShouldBeCreatedOnWriteAndShouldBeReadableAfterWrite {
+ def testCheckpointShouldBeNullIfCheckpointTopicDoesNotExistShouldBeCreatedOnWriteAndShouldBeReadableAfterWrite {
val kcm = getKafkaCheckpointManager
- kcm.register(partition)
+ val taskName = new TaskName(partition.toString)
+ kcm.register(taskName)
kcm.start
- var readCp = kcm.readLastCheckpoint(partition)
+ var readCp = kcm.readLastCheckpoint(taskName)
// read before topic exists should result in a null checkpoint
- assert(readCp == null)
+ assertNull(readCp)
// create topic the first time around
- kcm.writeCheckpoint(partition, cp1)
- readCp = kcm.readLastCheckpoint(partition)
- assert(cp1.equals(readCp))
+ kcm.writeCheckpoint(taskName, cp1)
+ readCp = kcm.readLastCheckpoint(taskName)
+ assertEquals(cp1, readCp)
// should get an exception if partition doesn't exist
try {
- readCp = kcm.readLastCheckpoint(new Partition(1))
+ readCp = kcm.readLastCheckpoint(new TaskName(new Partition(1).toString))
fail("Expected a SamzaException, since only one partition (partition 0) should exist.")
} catch {
- case e: Exception => None // expected
+ case e: SamzaException => None // expected
+ case _: Exception => fail("Expected a SamzaException, since only one partition (partition 0) should exist.")
}
// writing a second message should work, too
- kcm.writeCheckpoint(partition, cp2)
- readCp = kcm.readLastCheckpoint(partition)
- assert(cp2.equals(readCp))
+ kcm.writeCheckpoint(taskName, cp2)
+ readCp = kcm.readLastCheckpoint(taskName)
+ assertEquals(cp2, readCp)
kcm.stop
}
@Test
- def testUnrecovableKafkaErrorShouldThrowKafkaCheckpointManagerException {
+ def testUnrecoverableKafkaErrorShouldThrowKafkaCheckpointManagerException {
val exceptions = List("InvalidMessageException", "InvalidMessageSizeException", "UnknownTopicOrPartitionException")
exceptions.foreach { exceptionName =>
val kcm = getKafkaCheckpointManagerWithInvalidSerde(exceptionName)
- kcm.register(partition)
+ val taskName = new TaskName(partition.toString)
+ kcm.register(taskName)
kcm.start
- kcm.writeCheckpoint(partition, cp1)
+ kcm.writeCheckpoint(taskName, cp1)
// because serde will throw unrecoverable errors, it should result a KafkaCheckpointException
try {
- val readCpInvalide = kcm.readLastCheckpoint(partition)
+ kcm.readLastCheckpoint(taskName)
fail("Expected a KafkaCheckpointException.")
} catch {
case e: KafkaCheckpointException => None
@@ -156,28 +159,28 @@ class TestKafkaCheckpointManager {
clientId = "some-client-id",
checkpointTopic = "checkpoint-topic",
systemName = "kafka",
- totalPartitions = 1,
replicationFactor = 3,
socketTimeout = 30000,
bufferSize = 64 * 1024,
fetchSize = 300 * 1024,
metadataStore = metadataStore,
- connectProducer = () => new Producer[Partition, Array[Byte]](producerConfig),
- connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer))
+ connectProducer = () => new Producer[Array[Byte], Array[Byte]](producerConfig),
+ connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
+ systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString)
// inject serde. Kafka exceptions will be thrown when serde.fromBytes is called
private def getKafkaCheckpointManagerWithInvalidSerde(exception: String) = new KafkaCheckpointManager(
clientId = "some-client-id-invalid-serde",
checkpointTopic = "checkpoint-topic-invalid-serde",
systemName = "kafka",
- totalPartitions = 1,
replicationFactor = 3,
socketTimeout = 30000,
bufferSize = 64 * 1024,
fetchSize = 300 * 1024,
metadataStore = metadataStore,
- connectProducer = () => new Producer[Partition, Array[Byte]](producerConfig),
+ connectProducer = () => new Producer[Array[Byte], Array[Byte]](producerConfig),
connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
+ systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
serde = new InvalideSerde(exception))
class InvalideSerde(exception: String) extends CheckpointSerde {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index 6be9732..be1670c 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -22,10 +22,8 @@
package org.apache.samza.system.kafka
import org.junit.Assert._
-import org.junit.Test
+import org.junit.{Test, BeforeClass, AfterClass}
import kafka.zk.EmbeddedZookeeper
-import org.junit.BeforeClass
-import org.junit.AfterClass
import org.apache.samza.util.ClientUtilTopicMetadataStore
import org.I0Itec.zkclient.ZkClient
import kafka.admin.AdminUtils
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
index 751fe4c..6652f6b 100644
--- a/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
+++ b/samza-kv-leveldb/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
@@ -19,13 +19,9 @@
package org.apache.samza.storage.kv
-import java.nio.ByteBuffer
import org.iq80.leveldb._
-import org.fusesource.leveldbjni.internal.NativeComparator
import org.fusesource.leveldbjni.JniDBFactory._
import java.io._
-import java.util.Iterator
-import java.lang.Iterable
import org.apache.samza.config.Config
import org.apache.samza.container.SamzaContainerContext
import grizzled.slf4j.{ Logger, Logging }
@@ -39,8 +35,8 @@ object LevelDbKeyValueStore {
val options = new Options
// Cache size and write buffer size are specified on a per-container basis.
- options.cacheSize(cacheSize / containerContext.partitions.size)
- options.writeBufferSize((writeBufSize / containerContext.partitions.size).toInt)
+ options.cacheSize(cacheSize / containerContext.taskNames.size)
+ options.writeBufferSize((writeBufSize / containerContext.taskNames.size).toInt)
options.blockSize(storeConfig.getInt("leveldb.block.size.bytes", 4096))
options.compressionType(
storeConfig.get("leveldb.compression", "snappy") match {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
index 222c130..f20bb7f 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
@@ -20,6 +20,7 @@
package org.apache.samza.test.integration.join;
import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -45,12 +46,12 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask {
private KeyValueStore<String, String> state;
private int max;
- private String partition;
+ private TaskName taskName;
@Override
public void init(Config config, TaskContext context) {
this.state = (KeyValueStore<String, String>) context.getStore("emitter-state");
- this.partition = Integer.toString(context.getPartition().getPartitionId());
+ this.taskName = context.getTaskName();
this.max = config.getInt("count");
}
@@ -79,7 +80,7 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask {
}
int counter = getInt(COUNT);
if(counter < max) {
- OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(new SystemStream("kafka", "emitted"), Integer.toString(counter), epoch + "-" + partition);
+ OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(new SystemStream("kafka", "emitted"), Integer.toString(counter), epoch + "-" + taskName);
collector.send(envelope);
this.state.put(COUNT, Integer.toString(getInt(COUNT) + 1));
} else {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index c0ac5dd..7d0b8db 100644
--- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -22,7 +22,7 @@ package org.apache.samza.test.performance
import grizzled.slf4j.Logging
import org.apache.samza.config.Config
import org.apache.samza.config.StorageConfig._
-import org.apache.samza.container.SamzaContainerContext
+import org.apache.samza.container.{TaskName, SamzaContainerContext}
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.storage.kv.KeyValueStore
import org.apache.samza.storage.kv.KeyValueStorageEngine
@@ -34,7 +34,6 @@ import org.apache.samza.serializers.ByteSerde
import org.apache.samza.Partition
import org.apache.samza.SamzaException
import java.io.File
-import scala.collection.JavaConversions._
import java.util.UUID
/**
@@ -79,7 +78,9 @@ object TestKeyValuePerformance extends Logging {
val numLoops = config.getInt("test.num.loops", 100)
val messagesPerBatch = config.getInt("test.messages.per.batch", 10000)
val messageSizeBytes = config.getInt("test.message.size.bytes", 200)
- val partitions = (0 until partitionCount).map(new Partition(_))
+ val taskNames = new java.util.ArrayList[TaskName]()
+
+ (0 until partitionCount).map(p => taskNames.add(new TaskName(new Partition(p).toString)))
info("Using partition count: %s" format partitionCount)
info("Using num loops: %s" format numLoops)
@@ -109,7 +110,7 @@ object TestKeyValuePerformance extends Logging {
new ReadableCollector,
new MetricsRegistryMap,
null,
- new SamzaContainerContext("test", config, partitions))
+ new SamzaContainerContext("test", config, taskNames))
val db = if (!engine.isInstanceOf[KeyValueStorageEngine[_, _]]) {
throw new SamzaException("This test can only run with KeyValueStorageEngine configured as store factory.")
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index dc44a99..3ed8b5c 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -19,64 +19,55 @@
package org.apache.samza.test.integration
-import org.apache.samza.task.StreamTask
-import org.apache.samza.task.TaskContext
-import org.apache.samza.task.InitableTask
-import org.apache.samza.config.Config
-import scala.collection.JavaConversions._
-import org.apache.samza.task.TaskCoordinator
-import org.apache.samza.task.MessageCollector
-import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.checkpoint.Checkpoint
-import org.junit.BeforeClass
-import org.junit.AfterClass
-import kafka.zk.EmbeddedZookeeper
-import kafka.utils.TestUtils
-import org.apache.samza.system.SystemStream
-import kafka.utils.TestZKUtils
-import kafka.server.KafkaConfig
-import org.I0Itec.zkclient.ZkClient
+import java.util.Properties
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
+import kafka.admin.AdminUtils
+import kafka.common.ErrorMapping
+import kafka.consumer.Consumer
+import kafka.consumer.ConsumerConfig
+import kafka.message.MessageAndMetadata
+import kafka.producer.KeyedMessage
+import kafka.producer.Producer
import kafka.producer.ProducerConfig
+import kafka.server.KafkaConfig
import kafka.server.KafkaServer
+import kafka.utils.TestUtils
+import kafka.utils.TestZKUtils
import kafka.utils.Utils
-import org.apache.samza.storage.kv.KeyValueStore
-import org.apache.samza.util._
-import org.junit.Test
-import kafka.admin.AdminUtils
-import kafka.common.ErrorMapping
-import org.junit.Assert._
import kafka.utils.ZKStringSerializer
-import scala.collection.mutable.ArrayBuffer
-import org.apache.samza.job.local.LocalJobFactory
-import org.apache.samza.job.ApplicationStatus
-import java.util.concurrent.CountDownLatch
-import org.apache.samza.job.local.ThreadJob
-import org.apache.samza.util.TopicMetadataStore
-import org.apache.samza.util.ClientUtilTopicMetadataStore
+import kafka.zk.EmbeddedZookeeper
+import org.I0Itec.zkclient.ZkClient
+import org.apache.samza.Partition
+import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.config.Config
import org.apache.samza.config.MapConfig
+import org.apache.samza.container.TaskName
+import org.apache.samza.job.ApplicationStatus
+import org.apache.samza.job.StreamJob
+import org.apache.samza.job.local.LocalJobFactory
+import org.apache.samza.storage.kv.KeyValueStore
import org.apache.samza.system.kafka.TopicMetadataCache
-import org.apache.samza.container.SamzaContainer
+import org.apache.samza.system.{SystemStreamPartition, IncomingMessageEnvelope}
+import org.apache.samza.task.InitableTask
+import org.apache.samza.task.MessageCollector
+import org.apache.samza.task.StreamTask
+import org.apache.samza.task.TaskContext
+import org.apache.samza.task.TaskCoordinator
+import org.apache.samza.task.TaskCoordinator.RequestScope
+import org.apache.samza.util.ClientUtilTopicMetadataStore
+import org.apache.samza.util.TopicMetadataStore
+import org.junit.Assert._
+import org.junit.{BeforeClass, AfterClass, Test}
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.SynchronizedMap
-import org.apache.samza.Partition
-import java.util.concurrent.TimeUnit
-import kafka.producer.Producer
-import kafka.producer.KeyedMessage
-import java.util.concurrent.Semaphore
-import java.util.concurrent.CyclicBarrier
-import kafka.consumer.Consumer
-import kafka.consumer.ConsumerConfig
-import java.util.Properties
-import java.util.concurrent.Executors
-import kafka.message.MessageAndOffset
-import kafka.message.MessageAndMetadata
-import org.apache.samza.job.StreamJob
-import org.apache.samza.task.TaskCoordinator.RequestScope
object TestStatefulTask {
val INPUT_TOPIC = "input"
val STATE_TOPIC = "mystore"
- val TOTAL_PARTITIONS = 1
+ val TOTAL_TASK_NAMES = 1
val REPLICATION_FACTOR = 3
val zkConnect: String = TestZKUtils.zookeeperConnect
@@ -102,8 +93,8 @@ object TestStatefulTask {
config.put("serializer.class", "kafka.serializer.StringEncoder");
val producerConfig = new ProducerConfig(config)
var producer: Producer[String, String] = null
- val cp1 = new Checkpoint(Map(new SystemStream("kafka", "topic") -> "123"))
- val cp2 = new Checkpoint(Map(new SystemStream("kafka", "topic") -> "12345"))
+ val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "123"))
+ val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "12345"))
var zookeeper: EmbeddedZookeeper = null
var server1: KafkaServer = null
var server2: KafkaServer = null
@@ -128,13 +119,13 @@ object TestStatefulTask {
AdminUtils.createTopic(
zkClient,
INPUT_TOPIC,
- TOTAL_PARTITIONS,
+ TOTAL_TASK_NAMES,
REPLICATION_FACTOR)
AdminUtils.createTopic(
zkClient,
STATE_TOPIC,
- TOTAL_PARTITIONS,
+ TOTAL_TASK_NAMES,
REPLICATION_FACTOR)
}
@@ -221,7 +212,14 @@ class TestStatefulTask {
"systems.kafka.consumer.auto.offset.reset" -> "smallest", // applies to an empty topic
"systems.kafka.samza.msg.serde" -> "string",
"systems.kafka.consumer.zookeeper.connect" -> zkConnect,
- "systems.kafka.producer.metadata.broker.list" -> ("localhost:%s" format port1))
+ "systems.kafka.producer.metadata.broker.list" -> ("localhost:%s" format port1),
+ // Since using state, need a checkpoint manager
+ "task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
+ "task.checkpoint.system" -> "kafka",
+ "task.checkpoint.replication.factor" -> "1",
+ // However, don't have the inputs use the checkpoint manager
+ // since the second part of the test expects to replay the input streams.
+ "systems.kafka.streams.input.samza.reset.offset" -> "true")
@Test
def testShouldStartAndRestore {
@@ -278,7 +276,7 @@ class TestStatefulTask {
count += 1
}
- assertTrue(count < 100)
+ assertTrue("Timed out waiting to received messages. Received thus far: " + task.received.size, count < 100)
// Reset the count down latch after the 4 messages come in.
task.awaitMessage
@@ -328,8 +326,7 @@ class TestStatefulTask {
TestTask.awaitTaskRegistered
val tasks = TestTask.tasks
- // Should only have one partition.
- assertEquals(1, tasks.size)
+ assertEquals("Should only have a single partition in this task", 1, tasks.size)
val task = tasks.values.toList.head
@@ -392,25 +389,25 @@ class TestStatefulTask {
}
object TestTask {
- val tasks = new HashMap[Partition, TestTask] with SynchronizedMap[Partition, TestTask]
- @volatile var allTasksRegistered = new CountDownLatch(TestStatefulTask.TOTAL_PARTITIONS)
+ val tasks = new HashMap[TaskName, TestTask] with SynchronizedMap[TaskName, TestTask]
+ @volatile var allTasksRegistered = new CountDownLatch(TestStatefulTask.TOTAL_TASK_NAMES)
/**
* Static method that tasks can use to register themselves with. Useful so
* we don't have to sneak into the ThreadJob/SamzaContainer to get our test
* tasks.
*/
- def register(partition: Partition, task: TestTask) {
- tasks += partition -> task
+ def register(taskName: TaskName, task: TestTask) {
+ tasks += taskName -> task
allTasksRegistered.countDown
}
def awaitTaskRegistered {
allTasksRegistered.await(60, TimeUnit.SECONDS)
assertEquals(0, allTasksRegistered.getCount)
- assertEquals(TestStatefulTask.TOTAL_PARTITIONS, tasks.size)
+ assertEquals(TestStatefulTask.TOTAL_TASK_NAMES, tasks.size)
// Reset the registered latch, so we can use it again every time we start a new job.
- TestTask.allTasksRegistered = new CountDownLatch(TestStatefulTask.TOTAL_PARTITIONS)
+ TestTask.allTasksRegistered = new CountDownLatch(TestStatefulTask.TOTAL_TASK_NAMES)
}
}
@@ -422,7 +419,7 @@ class TestTask extends StreamTask with InitableTask {
var gotMessage = new CountDownLatch(1)
def init(config: Config, context: TaskContext) {
- TestTask.register(context.getPartition, this)
+ TestTask.register(context.getTaskName, this)
store = context
.getStore(TestStatefulTask.STATE_TOPIC)
.asInstanceOf[KeyValueStore[String, String]]
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
index ea6f03b..86fc0fd 100644
--- a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
+++ b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
@@ -119,21 +119,24 @@
%td.key Finished
%td= state.finishedTasks.size.toString
- %h3 Partition Assignment
- %table.table.table-striped.table-bordered.tablesorter#partitions-table
+ %h3 TaskName Assignment
+ %table.table.table-striped.table-bordered.tablesorter#taskids-table
%thead
%tr
%th Task ID
- %th Partitions
+ %th TaskName
+ %th SystemStreamPartitions
%th Container
%tbody
- - for((taskId, partitions) <- state.taskPartitions)
- %tr
- %td= taskId
- %td= partitions.map(_.getPartitionId).toList.sorted.mkString(", ")
- %td
- - val container = state.runningTasks(taskId)
- %a(target="_blank" href="http://#{container.nodeHttpAddress}/node/containerlogs/#{container.id.toString}/#{username}")= container.id.toString
+ - for((taskId, taskNames) <- state.taskToTaskNames)
+ - for((taskName, ssps) <- taskNames)
+ %tr
+ %td= taskId
+ %td= taskName
+ %td= ssps.map(_.toString).toList.sorted.mkString(", ")
+ %td
+ - val container = state.runningTasks(taskId)
+ %a(target="_blank" href="http://#{container.nodeHttpAddress}/node/containerlogs/#{container.id.toString}/#{username}")= container.id.toString
%div.tab-pane#config
%h2 Config
@@ -154,7 +157,7 @@
:javascript
$(document).ready(function() {
$("#containers-table").tablesorter();
- $("#partitions-table").tablesorter();
+ $("#taskids-table").tablesorter();
$("#config-table").tablesorter();
$("#config-table-filter").keyup(function() {
var regex = new RegExp($(this).val(), 'i');
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
index 01a2683..d9dfbc6 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
@@ -18,12 +18,12 @@
*/
package org.apache.samza.job.yarn
-import org.apache.samza.config.Config
import grizzled.slf4j.Logging
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
-import org.apache.samza.Partition
-import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.hadoop.yarn.api.records.ContainerId
+import java.util
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.container.TaskName
/**
* Samza's application master has state that is usually manipulated based on
@@ -40,7 +40,7 @@ class SamzaAppMasterState(val taskId: Int, val containerId: ContainerId, val nod
var unclaimedTasks = Set[Int]()
var finishedTasks = Set[Int]()
var runningTasks = Map[Int, YarnContainer]()
- var taskPartitions = Map[Int, Set[Partition]]()
+ var taskToTaskNames = Map[Int, util.Map[TaskName, util.Set[SystemStreamPartition]]]()
var status = FinalApplicationStatus.UNDEFINED
// controlled by the service
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/da79b6f9/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
index eb1ff54..0dd244d 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer
import java.util.Collections
import scala.collection.JavaConversions._
+import scala.collection.JavaConverters.mapAsJavaMapConverter
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.DataOutputBuffer
@@ -46,6 +47,7 @@ import org.apache.samza.job.ShellCommandBuilder
import org.apache.samza.util.Util
import grizzled.slf4j.Logging
+import org.apache.samza.container.TaskNamesToSystemStreamPartitions
object SamzaAppMasterTaskManager {
val DEFAULT_CONTAINER_MEM = 1024
@@ -72,7 +74,10 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
1
}
- val allSystemStreamPartitions = Util.getInputStreamPartitions(config)
+ val tasksToSSPTaskNames: Map[Int, TaskNamesToSystemStreamPartitions] = Util.assignContainerToSSPTaskNames(config, state.taskCount)
+
+ val taskNameToChangeLogPartitionMapping = Util.getTaskNameToChangeLogPartitionMapping(config, tasksToSSPTaskNames)
+
var taskFailures = Map[Int, TaskFailure]()
var tooManyFailedContainers = false
// TODO we might want to use NMClientAsync as well
@@ -106,13 +111,14 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
state.unclaimedTasks.headOption match {
case Some(taskId) => {
info("Got available task id (%d) for container: %s" format (taskId, container))
- val streamsAndPartitionsForTask = Util.getStreamsAndPartitionsForContainer(taskId, state.taskCount, allSystemStreamPartitions)
- info("Claimed partitions %s for container ID %s" format (streamsAndPartitionsForTask, taskId))
+ val sspTaskNames: TaskNamesToSystemStreamPartitions = tasksToSSPTaskNames.getOrElse(taskId, TaskNamesToSystemStreamPartitions())
+ info("Claimed SSP taskNames %s for container ID %s" format (sspTaskNames, taskId))
val cmdBuilderClassName = config.getCommandClass.getOrElse(classOf[ShellCommandBuilder].getName)
val cmdBuilder = Class.forName(cmdBuilderClassName).newInstance.asInstanceOf[CommandBuilder]
.setConfig(config)
.setName("samza-container-%s" format taskId)
- .setStreamPartitions(streamsAndPartitionsForTask)
+ .setTaskNameToSystemStreamPartitionsMapping(sspTaskNames.getJavaFriendlyType)
+ .setTaskNameToChangeLogPartitionMapping(taskNameToChangeLogPartitionMapping.map(kv => kv._1 -> Integer.valueOf(kv._2)).asJava)
val command = cmdBuilder.buildCommand
info("Task ID %s using command %s" format (taskId, command))
val env = cmdBuilder.buildEnvironment.map { case (k, v) => (k, Util.envVarEscape(v)) }
@@ -129,7 +135,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
state.neededContainers -= 1
state.runningTasks += taskId -> new YarnContainer(container)
state.unclaimedTasks -= taskId
- state.taskPartitions += taskId -> streamsAndPartitionsForTask.map(_.getPartition).toSet
+ state.taskToTaskNames += taskId -> sspTaskNames.getJavaFriendlyType
info("Claimed task ID %s for container %s on node %s (http://%s/node/containerlogs/%s)." format (taskId, containerIdStr, container.getNodeId.getHost, container.getNodeHttpAddress, containerIdStr))
@@ -151,7 +157,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
taskId match {
case Some(taskId) => {
state.runningTasks -= taskId
- state.taskPartitions -= taskId
+ state.taskToTaskNames -= taskId
}
case _ => None
}
@@ -315,4 +321,5 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
capability.setVirtualCores(cpuCores)
(0 until containers).foreach(idx => amClient.addContainerRequest(new ContainerRequest(capability, null, null, priority)))
}
+
}