You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2016/07/29 20:31:45 UTC
[1/3] samza git commit: SAMZA-979 - Remove KafkaCheckpointMigration
Repository: samza
Updated Branches:
refs/heads/master 97a3be397 -> 944dd02e1
SAMZA-979 - Remove KafkaCheckpointMigration
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/74b0f840
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/74b0f840
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/74b0f840
Branch: refs/heads/master
Commit: 74b0f8405a73fedda7a0b4b517f3946027d15626
Parents: 97a3be3
Author: Xinyu Liu <xi...@linkedin.com>
Authored: Fri Jul 29 12:58:13 2016 -0700
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Fri Jul 29 12:58:23 2016 -0700
----------------------------------------------------------------------
.../apache/samza/checkpoint/OffsetManager.scala | 1 -
.../scala/org/apache/samza/job/JobRunner.scala | 4 -
.../samza/migration/JobRunnerMigration.scala | 58 -----
.../org/apache/samza/job/TestJobRunner.scala | 20 +-
.../migration/KafkaCheckpointMigration.scala | 149 ------------
.../TestKafkaCheckpointMigration.scala | 243 -------------------
6 files changed, 1 insertion(+), 474 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/74b0f840/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 7245902..f8033c5 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -223,7 +223,6 @@ class OffsetManager(
}
}
- partitionOffsets.foreach(p => info("task " + taskName + " checkpoint " + p._1 + ", " + p._2))
checkpointManager.writeCheckpoint(taskName, new Checkpoint(partitionOffsets))
Option(lastProcessedOffsets.get(taskName)) match {
case Some(sspToOffsets) => sspToOffsets.foreach { case (ssp, checkpoint) => offsetManagerMetrics.checkpointedOffsets(ssp).set(checkpoint) }
http://git-wip-us.apache.org/repos/asf/samza/blob/74b0f840/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index a3613ff..383bb13 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -24,7 +24,6 @@ import org.apache.samza.config.{ConfigRewriter, Config}
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
import org.apache.samza.job.ApplicationStatus.Running
-import org.apache.samza.migration.JobRunnerMigration
import org.apache.samza.util.CommandLine
import org.apache.samza.util.Logging
import org.apache.samza.util.Util
@@ -122,9 +121,6 @@ class JobRunner(config: Config) extends Logging {
}
coordinatorSystemProducer.stop
- // Perform any migration plan to run in job runner
- JobRunnerMigration(config)
-
// Create the actual job, and submit it.
val job = jobFactory.getJob(config).submit
http://git-wip-us.apache.org/repos/asf/samza/blob/74b0f840/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala b/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala
deleted file mode 100644
index f38b87a..0000000
--- a/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.migration
-
-import org.apache.samza.config.Config
-import org.apache.samza.util.{Util, Logging}
-import org.apache.samza.SamzaException
-
-
-object JobRunnerMigration {
- val CHECKPOINT_MIGRATION = "org.apache.samza.migration.KafkaCheckpointMigration"
- val UNSUPPORTED_ERROR_MSG = "Auto checkpoint migration for 0.10.0 upgrade is only supported for Kafka checkpointing system, " +
- "for everything else, please use the checkpoint tool to migrate the taskname-to-changelog mapping, and add " +
- "task.checkpoint.skip-migration=true to your configs."
- def apply(config: Config) = {
- val migration = new JobRunnerMigration
- migration.checkpointMigration(config)
- }
-}
-
-class JobRunnerMigration extends Logging {
-
- def checkpointMigration(config: Config) = {
- val checkpointFactory = Option(config.get("task.checkpoint.factory"))
- checkpointFactory match {
- case Some("org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory") =>
- info("Performing checkpoint migration")
- val checkpointMigrationPlan = Util.getObj[MigrationPlan](JobRunnerMigration.CHECKPOINT_MIGRATION)
- checkpointMigrationPlan.migrate(config)
- case None =>
- info("No task.checkpoint.factory defined, not performing any checkpoint migration")
- case _ =>
- val skipMigration = config.getBoolean("task.checkpoint.skip-migration", false)
- if (skipMigration) {
- info("Job is configured to skip any checkpoint migration.")
- } else {
- error(JobRunnerMigration.UNSUPPORTED_ERROR_MSG)
- throw new SamzaException(JobRunnerMigration.UNSUPPORTED_ERROR_MSG)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74b0f840/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
index e97656a..17c5297 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
@@ -21,13 +21,11 @@ package org.apache.samza.job
import java.io.File
-import org.apache.samza.SamzaException
import org.apache.samza.config.Config
import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
-import org.apache.samza.migration.JobRunnerMigration
-import org.junit.Test
import org.junit.After
import org.junit.Assert._
+import org.junit.Test
object TestJobRunner {
var processCount = 0
@@ -41,22 +39,6 @@ class TestJobRunner {
}
@Test
- def testJobRunnerMigrationFails {
- MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
-
- try {
- JobRunner.main(Array(
- "--config-factory",
- "org.apache.samza.config.factories.PropertiesConfigFactory",
- "--config-path",
- "file://%s/src/test/resources/test-migration-fail.properties" format new File(".").getCanonicalPath))
- fail("Should have failed already.")
- } catch {
- case se: SamzaException => assertEquals(se.getMessage, JobRunnerMigration.UNSUPPORTED_ERROR_MSG)
- }
- }
-
- @Test
def testJobRunnerWorks {
MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
http://git-wip-us.apache.org/repos/asf/samza/blob/74b0f840/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala b/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
deleted file mode 100644
index 5d2641a..0000000
--- a/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.migration
-
-import kafka.utils.ZKStringSerializer
-import org.I0Itec.zkclient.ZkClient
-import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.kafka.{KafkaCheckpointManager, KafkaCheckpointManagerFactory}
-import org.apache.samza.config.Config
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.KafkaConfig.Config2Kafka
-import org.apache.samza.coordinator.stream.messages.{CoordinatorStreamMessage, SetMigrationMetaMessage}
-import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemFactory, CoordinatorStreamSystemProducer}
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.storage.ChangelogPartitionManager
-import org.apache.samza.util._
-
-/**
- * Migrates changelog partition mapping from checkpoint topic to coordinator stream
- */
-class KafkaCheckpointMigration extends MigrationPlan with Logging {
- val source = "CHECKPOINTMIGRATION"
- val migrationKey = "CheckpointMigration09to10"
- val migrationVal = "true"
-
- var connectZk: () => ZkClient = null
-
- private def getCheckpointSystemName(config: Config): String = {
- config
- .getCheckpointSystem
- .getOrElse(throw new SamzaException("no system defined for Kafka's checkpoint manager."))
- }
-
- private def getClientId(config: Config): String = {
- KafkaUtil.getClientId("samza-checkpoint-manager", config)
- }
-
- private def getTopicMetadataStore(config: Config): TopicMetadataStore = {
- val clientId = getClientId(config)
- val systemName = getCheckpointSystemName(config)
-
- val producerConfig = config.getKafkaSystemProducerConfig(
- systemName,
- clientId,
- KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
-
- val consumerConfig = config.getKafkaSystemConsumerConfig(
- systemName,
- clientId)
-
- new ClientUtilTopicMetadataStore(producerConfig.bootsrapServers, clientId, consumerConfig.socketTimeoutMs)
- }
-
- private def getConnectZk(config: Config): () => ZkClient = {
- val clientId = getClientId(config)
-
- val checkpointSystemName = getCheckpointSystemName(config)
-
- val consumerConfig = config.getKafkaSystemConsumerConfig(
- checkpointSystemName,
- clientId)
-
- val zkConnectString = Option(consumerConfig.zkConnect)
- .getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
- () => {
- new ZkClient(zkConnectString, 6000, 6000, ZKStringSerializer)
- }
- }
-
- override def migrate(config: Config) {
- val jobName = config.getName.getOrElse(throw new SamzaException("Cannot find job name. Cannot proceed with migration."))
- val jobId = config.getJobId.getOrElse("1")
-
- val checkpointTopicName = KafkaUtil.getCheckpointTopic(jobName, jobId)
-
- val coordinatorSystemFactory = new CoordinatorStreamSystemFactory
- val coordinatorSystemConsumer = coordinatorSystemFactory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
- val coordinatorSystemProducer = coordinatorSystemFactory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
-
- val checkpointManager = new KafkaCheckpointManagerFactory().getCheckpointManager(config, new NoOpMetricsRegistry).asInstanceOf[KafkaCheckpointManager]
-
- val kafkaUtil = new KafkaUtil(new ExponentialSleepStrategy, getConnectZk(config))
-
- // make sure to validate that we only perform migration when checkpoint topic exists
- if (kafkaUtil.topicExists(checkpointTopicName)) {
- kafkaUtil.validateTopicPartitionCount(
- checkpointTopicName,
- getCheckpointSystemName(config),
- getTopicMetadataStore(config),
- 1,
- config.failOnCheckpointValidation)
-
- if (migrationVerification(coordinatorSystemConsumer)) {
- info("Migration %s was already performed, doing nothing" format migrationKey)
- return
- }
-
- info("No previous migration for %s were detected, performing migration" format migrationKey)
-
- info("Loading changelog partition mapping from checkpoint topic - %s" format checkpointTopicName)
- val changelogMap = checkpointManager.readChangeLogPartitionMapping()
- checkpointManager.stop
-
- info("Writing changelog to coordinator stream topic - %s" format Util.getCoordinatorStreamName(jobName, jobId))
- val changelogPartitionManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, source)
- changelogPartitionManager.start()
- changelogPartitionManager.writeChangeLogPartitionMapping(changelogMap)
- changelogPartitionManager.stop()
- }
- migrationCompletionMark(coordinatorSystemProducer)
-
- }
-
- def migrationVerification(coordinatorSystemConsumer : CoordinatorStreamSystemConsumer): Boolean = {
- coordinatorSystemConsumer.register()
- coordinatorSystemConsumer.start()
- coordinatorSystemConsumer.bootstrap()
- val stream = coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE)
- val message = new SetMigrationMetaMessage(source, migrationKey, migrationVal)
- stream.contains(message.asInstanceOf[CoordinatorStreamMessage])
- }
-
- def migrationCompletionMark(coordinatorSystemProducer: CoordinatorStreamSystemProducer) = {
- info("Marking completion of migration %s" format migrationKey)
- val message = new SetMigrationMetaMessage(source, migrationKey, migrationVal)
- coordinatorSystemProducer.register(source)
- coordinatorSystemProducer.start()
- coordinatorSystemProducer.send(message)
- coordinatorSystemProducer.stop()
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/74b0f840/samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala b/samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala
deleted file mode 100644
index 504fc89..0000000
--- a/samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.migration
-
-import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{TestUtils, TestZKUtils, Utils}
-import kafka.zk.EmbeddedZookeeper
-import org.I0Itec.zkclient.ZkClient
-import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord}
-import org.apache.samza.checkpoint.Checkpoint
-import org.apache.samza.checkpoint.kafka.{KafkaCheckpointManager, KafkaCheckpointLogKey, KafkaCheckpointManagerFactory}
-import org.apache.samza.config._
-import org.apache.samza.container.TaskName
-import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
-import org.apache.samza.coordinator.MockSystemFactory
-import org.apache.samza.coordinator.stream.messages.SetMigrationMetaMessage
-import org.apache.samza.coordinator.stream._
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.serializers.CheckpointSerde
-import org.apache.samza.storage.ChangelogPartitionManager
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util._
-import org.apache.samza.Partition
-import org.junit.Assert._
-import org.junit._
-
-import scala.collection.JavaConversions._
-import scala.collection._
-
-class TestKafkaCheckpointMigration {
-
- val checkpointTopic = "checkpoint-topic"
- val serdeCheckpointTopic = "checkpoint-topic-invalid-serde"
- val checkpointTopicConfig = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(null)
- val zkConnect: String = TestZKUtils.zookeeperConnect
- var zkClient: ZkClient = null
- val zkConnectionTimeout = 6000
- val zkSessionTimeout = 6000
-
- val brokerId1 = 0
- val brokerId2 = 1
- val brokerId3 = 2
- val ports = TestUtils.choosePorts(3)
- val (port1, port2, port3) = (ports(0), ports(1), ports(2))
-
- val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
- props1.put("controlled.shutdown.enable", "true")
- val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
- props1.put("controlled.shutdown.enable", "true")
- val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
- props1.put("controlled.shutdown.enable", "true")
-
- val config = new java.util.HashMap[String, Object]()
- val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
- config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
- config.put("acks", "all")
- config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
- config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(java.lang.Integer.MAX_VALUE-1)).toString)
- config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
- val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
- val partition = new Partition(0)
- val partition2 = new Partition(1)
- val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "123"))
- val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "12345"))
- var zookeeper: EmbeddedZookeeper = null
- var server1: KafkaServer = null
- var server2: KafkaServer = null
- var server3: KafkaServer = null
- var metadataStore: TopicMetadataStore = null
-
- val systemStreamPartitionGrouperFactoryString = classOf[GroupByPartitionFactory].getCanonicalName
-
- @Before
- def beforeSetupServers {
- zookeeper = new EmbeddedZookeeper(zkConnect)
- server1 = TestUtils.createServer(new KafkaConfig(props1))
- server2 = TestUtils.createServer(new KafkaConfig(props2))
- server3 = TestUtils.createServer(new KafkaConfig(props3))
- metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
- }
-
- @After
- def afterCleanLogDirs {
- server1.shutdown
- server1.awaitShutdown()
- server2.shutdown
- server2.awaitShutdown()
- server3.shutdown
- server3.awaitShutdown()
- Utils.rm(server1.config.logDirs)
- Utils.rm(server2.config.logDirs)
- Utils.rm(server3.config.logDirs)
- zookeeper.shutdown
- }
-
- private def writeChangeLogPartitionMapping(changelogMapping: Map[TaskName, Integer], cpTopic: String = checkpointTopic) = {
- val producer: Producer[Array[Byte], Array[Byte]] = new KafkaProducer(producerConfig.getProducerProperties)
- val record = new ProducerRecord(
- cpTopic,
- 0,
- KafkaCheckpointLogKey.getChangelogPartitionMappingKey().toBytes(),
- new CheckpointSerde().changelogPartitionMappingToBytes(changelogMapping)
- )
- try {
- producer.send(record).get()
- } catch {
- case e: Exception => println(e.getMessage)
- } finally {
- producer.close()
- }
- }
-
- @Test
- def testMigrationWithNoCheckpointTopic() {
- val mapConfig = Map[String, String](
- "task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
- JobConfig.JOB_NAME -> "test",
- JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
- JobConfig.JOB_CONTAINER_COUNT -> "2",
- "task.inputs" -> "test.stream1",
- "task.checkpoint.system" -> "test",
- SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
- "systems.test.producer.bootstrap.servers" -> brokers,
- "systems.test.consumer.zookeeper.connect" -> zkConnect,
- SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName)
-
- // Enable consumer caching
- MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
-
- val config: MapConfig = new MapConfig(mapConfig)
- val migrate = new KafkaCheckpointMigration
- migrate.migrate(config)
- val consumer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemConsumer(config, new NoOpMetricsRegistry)
- consumer.register()
- consumer.start()
- consumer.bootstrap()
- val bootstrappedStream = consumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE)
- assertEquals(1, bootstrappedStream.size())
-
- val expectedMigrationMessage = new SetMigrationMetaMessage("CHECKPOINTMIGRATION", "CheckpointMigration09to10", "true")
- assertEquals(expectedMigrationMessage, bootstrappedStream.head)
- consumer.stop()
- }
-
- @Test
- def testMigration() {
- try {
- val mapConfig = Map(
- "task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
- JobConfig.JOB_NAME -> "test",
- JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
- JobConfig.JOB_CONTAINER_COUNT -> "2",
- "task.inputs" -> "test.stream1",
- "task.checkpoint.system" -> "test",
- "systems.test.producer.bootstrap.servers" -> brokers,
- "systems.test.consumer.zookeeper.connect" -> zkConnect,
- SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
- SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName)
- // Enable consumer caching
- MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
-
- val config = new MapConfig(mapConfig)
- val checkpointTopicName = KafkaUtil.getCheckpointTopic("test", "1")
- val checkpointManager = new KafkaCheckpointManagerFactory().getCheckpointManager(config, new NoOpMetricsRegistry).asInstanceOf[KafkaCheckpointManager]
-
- // Write a couple of checkpoints in the old checkpoint topic
- val task1 = new TaskName(partition.toString)
- val task2 = new TaskName(partition2.toString)
- checkpointManager.start
- checkpointManager.register(task1)
- checkpointManager.register(task2)
- checkpointManager.writeCheckpoint(task1, cp1)
- checkpointManager.writeCheckpoint(task2, cp2)
-
- // Write changelog partition info to the old checkpoint topic
- val changelogMapping = Map(task1 -> 1.asInstanceOf[Integer], task2 -> 10.asInstanceOf[Integer])
- writeChangeLogPartitionMapping(changelogMapping, checkpointTopicName)
- checkpointManager.stop
-
- // Initialize coordinator stream
- val coordinatorFactory = new CoordinatorStreamSystemFactory()
- val coordinatorSystemConsumer = coordinatorFactory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
- val coordinatorSystemProducer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
- coordinatorSystemConsumer.register()
- coordinatorSystemConsumer.start()
-
- assertEquals(coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE).size, 0)
- coordinatorSystemConsumer.stop
-
- // Start the migration
- val migrationInstance = new KafkaCheckpointMigration
- migrationInstance.migrate(config)
-
- // Verify if the changelogPartitionInfo has been migrated
- val newChangelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, "test")
- newChangelogManager.start
- val newChangelogMapping = newChangelogManager.readChangeLogPartitionMapping()
- newChangelogManager.stop
- assertEquals(newChangelogMapping.toMap, changelogMapping)
-
- // Check for migration message
- coordinatorSystemConsumer.register()
- coordinatorSystemConsumer.start()
- assertEquals(coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE).size, 1)
- coordinatorSystemConsumer.stop()
- }
- finally {
- MockCoordinatorStreamSystemFactory.disableMockConsumerCache()
- }
- }
-
- class MockKafkaCheckpointMigration extends KafkaCheckpointMigration{
- var migrationCompletionMarkFlag: Boolean = false
- var migrationVerificationMarkFlag: Boolean = false
-
- override def migrationCompletionMark(coordinatorStreamProducer: CoordinatorStreamSystemProducer) = {
- migrationCompletionMarkFlag = true
- super.migrationCompletionMark(coordinatorStreamProducer)
- }
-
- override def migrationVerification(coordinatorStreamConsumer: CoordinatorStreamSystemConsumer): Boolean = {
- migrationVerificationMarkFlag = true
- super.migrationVerification(coordinatorStreamConsumer)
- }
- }
-}
[3/3] samza git commit: SAMZA-981 - Set consistent Kafka clientId for
a job instance
Posted by na...@apache.org.
SAMZA-981 - Set consistent Kafka clientId for a job instance
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/944dd02e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/944dd02e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/944dd02e
Branch: refs/heads/master
Commit: 944dd02e1d00bcce59f1fcc33ecbb2a8acd95870
Parents: b3d5c4c
Author: Xinyu Liu <xi...@linkedin.com>
Authored: Fri Jul 29 13:11:36 2016 -0700
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Fri Jul 29 13:11:36 2016 -0700
----------------------------------------------------------------------
.../org/apache/samza/config/KafkaConfig.scala | 4 +-
.../samza/config/RegExTopicGenerator.scala | 2 +-
.../system/kafka/KafkaSystemConsumer.scala | 2 +-
.../scala/org/apache/samza/util/KafkaUtil.scala | 6 +--
.../apache/samza/config/TestKafkaConfig.scala | 44 +++++++++-----------
.../system/kafka/TestKafkaSystemConsumer.scala | 13 +++---
6 files changed, 33 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/944dd02e/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 1822511..973ab8c 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -148,7 +148,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
// kafka config
def getKafkaSystemConsumerConfig(
systemName: String,
- clientId: String = "undefined-samza-consumer-%s" format UUID.randomUUID.toString,
+ clientId: String,
groupId: String = "undefined-samza-consumer-group-%s" format UUID.randomUUID.toString,
injectedProps: Map[String, String] = Map()) = {
@@ -163,7 +163,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
def getKafkaSystemProducerConfig(
systemName: String,
- clientId: String = "undefined-samza-producer-%s" format UUID.randomUUID.toString,
+ clientId: String,
injectedProps: Map[String, String] = Map()) = {
val subConf = config.subset("systems.%s.producer." format systemName, true)
http://git-wip-us.apache.org/repos/asf/samza/blob/944dd02e/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
index 78467bf..bcbad27 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
@@ -99,7 +99,7 @@ class RegExTopicGenerator extends ConfigRewriter with Logging {
val systemName = config
.getRegexResolvedSystem(rewriterName)
.getOrElse(throw new SamzaException("No system defined in config for rewriter %s." format rewriterName))
- val consumerConfig = config.getKafkaSystemConsumerConfig(systemName)
+ val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, "")
val zkConnect = Option(consumerConfig.zkConnect)
.getOrElse(throw new SamzaException("No zookeeper.connect for system %s defined in config." format systemName))
val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
http://git-wip-us.apache.org/repos/asf/samza/blob/944dd02e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index b373753..36567b6 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -64,7 +64,7 @@ private[kafka] class KafkaSystemConsumer(
systemAdmin: SystemAdmin,
metrics: KafkaSystemConsumerMetrics,
metadataStore: TopicMetadataStore,
- clientId: String = "undefined-client-id-%s" format UUID.randomUUID.toString,
+ clientId: String,
timeout: Int = ConsumerConfig.ConsumerTimeoutMs,
bufferSize: Int = ConsumerConfig.SocketBufferSize,
fetchSize: StreamFetchSizes = new StreamFetchSizes,
http://git-wip-us.apache.org/repos/asf/samza/blob/944dd02e/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index a25ba62..e95f052 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -45,12 +45,10 @@ object KafkaUtil extends Logging {
config.getJobId.getOrElse("1"))
def getClientId(id: String, jobName: String, jobId: String): String =
- "%s-%s-%s-%s-%s" format
+ "%s-%s-%s" format
(id.replaceAll("[^A-Za-z0-9]", "_"),
jobName.replaceAll("[^A-Za-z0-9]", "_"),
- jobId.replaceAll("[^A-Za-z0-9]", "_"),
- System.currentTimeMillis,
- counter.getAndIncrement)
+ jobId.replaceAll("[^A-Za-z0-9]", "_"))
private def abs(n: Int) = if (n == Integer.MIN_VALUE) 0 else math.abs(n)
http://git-wip-us.apache.org/repos/asf/samza/blob/944dd02e/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index c4a83f6..9c9c71e 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -37,6 +37,8 @@ class TestKafkaConfig {
var props : Properties = new Properties
val SYSTEM_NAME = "kafka";
val KAFKA_PRODUCER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".producer."
+ val TEST_CLIENT_ID = "TestClientId"
+ val TEST_GROUP_ID = "TestGroupId"
@Before
def setupProperties() {
@@ -53,52 +55,46 @@ class TestKafkaConfig {
val mapConfig = new MapConfig(props.toMap[String, String])
val kafkaConfig = new KafkaConfig(mapConfig)
- val consumerConfig1 = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME)
+ val consumerConfig1 = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME, "TestClientId1")
val consumerClientId1 = consumerConfig1.clientId
val groupId1 = consumerConfig1.groupId
- val consumerConfig2 = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME)
+ val consumerConfig2 = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME, "TestClientId2")
val consumerClientId2 = consumerConfig2.clientId
val groupId2 = consumerConfig2.groupId
- assert(consumerClientId1.startsWith("undefined-samza-consumer-"))
- assert(consumerClientId2.startsWith("undefined-samza-consumer-"))
+ assert(consumerClientId1.equals("TestClientId1"))
+ assert(consumerClientId2.equals("TestClientId2"))
assert(groupId1.startsWith("undefined-samza-consumer-group-"))
assert(groupId2.startsWith("undefined-samza-consumer-group-"))
assert(consumerClientId1 != consumerClientId2)
assert(groupId1 != groupId2)
- val consumerConfig3 = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME, "TestClientId", "TestGroupId")
+ val consumerConfig3 = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME, TEST_CLIENT_ID, TEST_GROUP_ID)
val consumerClientId3 = consumerConfig3.clientId
val groupId3 = consumerConfig3.groupId
- assert(consumerClientId3 == "TestClientId")
- assert(groupId3 == "TestGroupId")
+ assert(consumerClientId3.equals(TEST_CLIENT_ID))
+ assert(groupId3.equals(TEST_GROUP_ID))
- val producerConfig1 = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME)
+ val producerConfig1 = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, "TestClientId1")
val producerClientId1 = producerConfig1.clientId
- val producerConfig2 = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME)
+ val producerConfig2 = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, "TestClientId2")
val producerClientId2 = producerConfig2.clientId
- assert(producerClientId1.startsWith("undefined-samza-producer-"))
- assert(producerClientId2.startsWith("undefined-samza-producer-"))
- assert(producerClientId1 != producerClientId2)
-
- val producerConfig3 = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, "TestClientId")
- val producerClientId3 = producerConfig3.clientId
- assert(producerClientId3 == "TestClientId")
-
+ assert(producerClientId1.equals("TestClientId1"))
+ assert(producerClientId2.equals("TestClientId2"))
}
@Test
def testStreamLevelFetchSizeOverride() {
val mapConfig = new MapConfig(props.toMap[String, String])
val kafkaConfig = new KafkaConfig(mapConfig)
- val consumerConfig = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME)
+ val consumerConfig = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
// default fetch size
assertEquals(1024*1024, consumerConfig.fetchMessageMaxBytes)
props.setProperty("systems." + SYSTEM_NAME + ".consumer.fetch.message.max.bytes", "262144")
val mapConfig1 = new MapConfig(props.toMap[String, String])
val kafkaConfig1 = new KafkaConfig(mapConfig1)
- val consumerConfig1 = kafkaConfig1.getKafkaSystemConsumerConfig(SYSTEM_NAME)
+ val consumerConfig1 = kafkaConfig1.getKafkaSystemConsumerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
// shared fetch size
assertEquals(512*512, consumerConfig1.fetchMessageMaxBytes)
@@ -140,7 +136,7 @@ class TestKafkaConfig {
def testDefaultValuesForProducerProperties() {
val mapConfig = new MapConfig(props.toMap[String, String])
val kafkaConfig = new KafkaConfig(mapConfig)
- val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME)
+ val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
val producerProperties = kafkaProducerConfig.getProducerProperties
assertEquals(classOf[ByteArraySerializer].getCanonicalName, producerProperties.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
@@ -157,7 +153,7 @@ class TestKafkaConfig {
val mapConfig = new MapConfig(props.toMap[String, String])
val kafkaConfig = new KafkaConfig(mapConfig)
- val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME)
+ val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
val producerProperties = kafkaProducerConfig.getProducerProperties
assertEquals(expectedValue, producerProperties.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION))
@@ -171,7 +167,7 @@ class TestKafkaConfig {
val mapConfig = new MapConfig(props.toMap[String, String])
val kafkaConfig = new KafkaConfig(mapConfig)
- val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME)
+ val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
val producerProperties = kafkaProducerConfig.getProducerProperties
assertEquals(expectedValue, producerProperties.get(ProducerConfig.RETRIES_CONFIG))
@@ -183,7 +179,7 @@ class TestKafkaConfig {
val mapConfig = new MapConfig(props.toMap[String, String])
val kafkaConfig = new KafkaConfig(mapConfig)
- val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME)
+ val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
kafkaProducerConfig.getProducerProperties
}
@@ -193,7 +189,7 @@ class TestKafkaConfig {
val mapConfig = new MapConfig(props.toMap[String, String])
val kafkaConfig = new KafkaConfig(mapConfig)
- val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME)
+ val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
kafkaProducerConfig.getProducerProperties
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/944dd02e/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
index ece0359..3b3ed3d 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
@@ -41,11 +41,12 @@ class TestKafkaSystemConsumer {
private val SSP: SystemStreamPartition = new SystemStreamPartition("test", "test", new Partition(0))
private val envelope: IncomingMessageEnvelope = new IncomingMessageEnvelope(SSP, null, null, null)
private val envelopeWithSize: IncomingMessageEnvelope = new IncomingMessageEnvelope(SSP, null, null, null, 100)
+ private val clientId = "TestClientId"
@Test
def testFetchThresholdShouldDivideEvenlyAmongPartitions {
val metadataStore = new MockMetadataStore
- val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, fetchThreshold = 50000) {
+ val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, fetchThreshold = 50000) {
override def refreshBrokers {
}
}
@@ -70,7 +71,7 @@ class TestKafkaSystemConsumer {
val metadataStore = new MockMetadataStore(Map(streamName -> TopicMetadata(streamName, Seq.empty, 0)))
var hosts = List[String]()
var getHostPortCount = 0
- val consumer = new KafkaSystemConsumer(systemName, systemAdmin, metrics, metadataStore) {
+ val consumer = new KafkaSystemConsumer(systemName, systemAdmin, metrics, metadataStore, clientId) {
override def getHostPort(topicMetadata: TopicMetadata, partition: Int): Option[(String, Int)] = {
// Generate a unique host every time getHostPort is called.
getHostPortCount += 1
@@ -106,7 +107,7 @@ class TestKafkaSystemConsumer {
when(systemAdmin.offsetComparator(anyString, anyString)).thenCallRealMethod()
val metadataStore = new MockMetadataStore
- val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, fetchThreshold = 50000)
+ val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, fetchThreshold = 50000)
val ssp0 = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(1))
val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(2))
@@ -125,7 +126,7 @@ class TestKafkaSystemConsumer {
@Test
def testFetchThresholdBytesShouldDivideEvenlyAmongPartitions {
val metadataStore = new MockMetadataStore
- val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore,
+ val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId,
fetchThreshold = 50000, fetchThresholdBytes = 60000L, fetchLimitByBytesEnabled = true) {
override def refreshBrokers {
}
@@ -144,7 +145,7 @@ class TestKafkaSystemConsumer {
@Test
def testFetchThresholdBytes {
val metadataStore = new MockMetadataStore
- val consumer = new KafkaSystemConsumer("test-system", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore,
+ val consumer = new KafkaSystemConsumer("test-system", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId,
fetchThreshold = 50000, fetchThresholdBytes = 60000L, fetchLimitByBytesEnabled = true) {
override def refreshBrokers {
}
@@ -168,7 +169,7 @@ class TestKafkaSystemConsumer {
@Test
def testFetchThresholdBytesDisabled {
val metadataStore = new MockMetadataStore
- val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore,
+ val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId,
fetchThreshold = 50000, fetchThresholdBytes = 60000L) {
override def refreshBrokers {
}
[2/3] samza git commit: SAMZA-982 - Add null check for offset update
in OffsetManager
Posted by na...@apache.org.
SAMZA-982 - Add null check for offset update in OffsetManager
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b3d5c4cc
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b3d5c4cc
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b3d5c4cc
Branch: refs/heads/master
Commit: b3d5c4cc0d2babedc48015aebbbec6fb99d2a1ce
Parents: 74b0f84
Author: Xinyu Liu <xi...@linkedin.com>
Authored: Fri Jul 29 13:10:00 2016 -0700
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Fri Jul 29 13:10:43 2016 -0700
----------------------------------------------------------------------
.../main/scala/org/apache/samza/checkpoint/OffsetManager.scala | 4 +++-
.../scala/org/apache/samza/checkpoint/TestOffsetManager.scala | 2 ++
2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/b3d5c4cc/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index f8033c5..c41eadb 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -186,7 +186,9 @@ class OffsetManager(
*/
def update(taskName: TaskName, systemStreamPartition: SystemStreamPartition, offset: String) {
lastProcessedOffsets.putIfAbsent(taskName, new ConcurrentHashMap[SystemStreamPartition, String]())
- lastProcessedOffsets.get(taskName).put(systemStreamPartition, offset)
+ if (offset != null) {
+ lastProcessedOffsets.get(taskName).put(systemStreamPartition, offset)
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/b3d5c4cc/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index 75ba8af..cb78223 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -81,6 +81,8 @@ class TestOffsetManager {
assertEquals("47", offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).get)
// Should never update starting offset.
assertEquals("46", offsetManager.getStartingOffset(taskName, systemStreamPartition).get)
+ // Should not update null offset
+ offsetManager.update(taskName, systemStreamPartition, null)
offsetManager.checkpoint(taskName)
val expectedCheckpoint = new Checkpoint(Map(systemStreamPartition -> "47"))
assertEquals(expectedCheckpoint, checkpointManager.readLastCheckpoint(taskName))