You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2016/07/29 20:31:45 UTC

[1/3] samza git commit: SAMZA-979 - Remove KafkaCheckpointMigration

Repository: samza
Updated Branches:
  refs/heads/master 97a3be397 -> 944dd02e1


SAMZA-979 - Remove KafkaCheckpointMigration


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/74b0f840
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/74b0f840
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/74b0f840

Branch: refs/heads/master
Commit: 74b0f8405a73fedda7a0b4b517f3946027d15626
Parents: 97a3be3
Author: Xinyu Liu <xi...@linkedin.com>
Authored: Fri Jul 29 12:58:13 2016 -0700
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Fri Jul 29 12:58:23 2016 -0700

----------------------------------------------------------------------
 .../apache/samza/checkpoint/OffsetManager.scala |   1 -
 .../scala/org/apache/samza/job/JobRunner.scala  |   4 -
 .../samza/migration/JobRunnerMigration.scala    |  58 -----
 .../org/apache/samza/job/TestJobRunner.scala    |  20 +-
 .../migration/KafkaCheckpointMigration.scala    | 149 ------------
 .../TestKafkaCheckpointMigration.scala          | 243 -------------------
 6 files changed, 1 insertion(+), 474 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/74b0f840/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 7245902..f8033c5 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -223,7 +223,6 @@ class OffsetManager(
         }
       }
 
-      partitionOffsets.foreach(p => info("task " + taskName + " checkpoint " + p._1 + ", " + p._2))
       checkpointManager.writeCheckpoint(taskName, new Checkpoint(partitionOffsets))
       Option(lastProcessedOffsets.get(taskName)) match {
         case Some(sspToOffsets) => sspToOffsets.foreach { case (ssp, checkpoint) => offsetManagerMetrics.checkpointedOffsets(ssp).set(checkpoint) }

http://git-wip-us.apache.org/repos/asf/samza/blob/74b0f840/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index a3613ff..383bb13 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -24,7 +24,6 @@ import org.apache.samza.config.{ConfigRewriter, Config}
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
 import org.apache.samza.job.ApplicationStatus.Running
-import org.apache.samza.migration.JobRunnerMigration
 import org.apache.samza.util.CommandLine
 import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
@@ -122,9 +121,6 @@ class JobRunner(config: Config) extends Logging {
     }
     coordinatorSystemProducer.stop
 
-    // Perform any migration plan to run in job runner
-    JobRunnerMigration(config)
-
     // Create the actual job, and submit it.
     val job = jobFactory.getJob(config).submit
 

http://git-wip-us.apache.org/repos/asf/samza/blob/74b0f840/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala b/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala
deleted file mode 100644
index f38b87a..0000000
--- a/samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.migration
-
-import org.apache.samza.config.Config
-import org.apache.samza.util.{Util, Logging}
-import org.apache.samza.SamzaException
-
-
-object JobRunnerMigration {
-  val CHECKPOINT_MIGRATION = "org.apache.samza.migration.KafkaCheckpointMigration"
-  val UNSUPPORTED_ERROR_MSG = "Auto checkpoint migration for 0.10.0 upgrade is only supported for Kafka checkpointing system, " +
-    "for everything else, please use the checkpoint tool to migrate the taskname-to-changelog mapping, and add " +
-    "task.checkpoint.skip-migration=true to your configs."
-  def apply(config: Config) = {
-    val migration = new JobRunnerMigration
-    migration.checkpointMigration(config)
-  }
-}
-
-class JobRunnerMigration extends Logging {
-
-  def checkpointMigration(config: Config) = {
-    val checkpointFactory = Option(config.get("task.checkpoint.factory"))
-    checkpointFactory match {
-      case Some("org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory") =>
-        info("Performing checkpoint migration")
-        val checkpointMigrationPlan = Util.getObj[MigrationPlan](JobRunnerMigration.CHECKPOINT_MIGRATION)
-        checkpointMigrationPlan.migrate(config)
-      case None =>
-        info("No task.checkpoint.factory defined, not performing any checkpoint migration")
-      case _ =>
-        val skipMigration = config.getBoolean("task.checkpoint.skip-migration", false)
-        if (skipMigration) {
-          info("Job is configured to skip any checkpoint migration.")
-        } else {
-          error(JobRunnerMigration.UNSUPPORTED_ERROR_MSG)
-          throw new SamzaException(JobRunnerMigration.UNSUPPORTED_ERROR_MSG)
-        }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74b0f840/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
index e97656a..17c5297 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
@@ -21,13 +21,11 @@ package org.apache.samza.job
 
 import java.io.File
 
-import org.apache.samza.SamzaException
 import org.apache.samza.config.Config
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
-import org.apache.samza.migration.JobRunnerMigration
-import org.junit.Test
 import org.junit.After
 import org.junit.Assert._
+import org.junit.Test
 
 object TestJobRunner {
   var processCount = 0
@@ -41,22 +39,6 @@ class TestJobRunner {
   }
 
   @Test
-  def testJobRunnerMigrationFails {
-    MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
-
-    try {
-      JobRunner.main(Array(
-        "--config-factory",
-        "org.apache.samza.config.factories.PropertiesConfigFactory",
-        "--config-path",
-        "file://%s/src/test/resources/test-migration-fail.properties" format new File(".").getCanonicalPath))
-      fail("Should have failed already.")
-    } catch {
-      case se: SamzaException => assertEquals(se.getMessage, JobRunnerMigration.UNSUPPORTED_ERROR_MSG)
-    }
-  }
-
-  @Test
   def testJobRunnerWorks {
     MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
 

http://git-wip-us.apache.org/repos/asf/samza/blob/74b0f840/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala b/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
deleted file mode 100644
index 5d2641a..0000000
--- a/samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.migration
-
-import kafka.utils.ZKStringSerializer
-import org.I0Itec.zkclient.ZkClient
-import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.kafka.{KafkaCheckpointManager, KafkaCheckpointManagerFactory}
-import org.apache.samza.config.Config
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.KafkaConfig.Config2Kafka
-import org.apache.samza.coordinator.stream.messages.{CoordinatorStreamMessage, SetMigrationMetaMessage}
-import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemFactory, CoordinatorStreamSystemProducer}
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.storage.ChangelogPartitionManager
-import org.apache.samza.util._
-
-/**
- * Migrates changelog partition mapping from checkpoint topic to coordinator stream
- */
-class KafkaCheckpointMigration extends MigrationPlan with Logging {
-  val source = "CHECKPOINTMIGRATION"
-  val migrationKey = "CheckpointMigration09to10"
-  val migrationVal = "true"
-
-  var connectZk: () => ZkClient = null
-
-  private def getCheckpointSystemName(config: Config): String = {
-    config
-      .getCheckpointSystem
-      .getOrElse(throw new SamzaException("no system defined for Kafka's checkpoint manager."))
-  }
-
-  private def getClientId(config: Config): String = {
-    KafkaUtil.getClientId("samza-checkpoint-manager", config)
-  }
-
-  private def getTopicMetadataStore(config: Config): TopicMetadataStore = {
-    val clientId = getClientId(config)
-    val systemName = getCheckpointSystemName(config)
-
-    val producerConfig = config.getKafkaSystemProducerConfig(
-      systemName,
-      clientId,
-      KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
-
-    val consumerConfig = config.getKafkaSystemConsumerConfig(
-      systemName,
-      clientId)
-
-    new ClientUtilTopicMetadataStore(producerConfig.bootsrapServers, clientId, consumerConfig.socketTimeoutMs)
-  }
-
-  private def getConnectZk(config: Config): () => ZkClient = {
-    val clientId = getClientId(config)
-
-    val checkpointSystemName = getCheckpointSystemName(config)
-
-    val consumerConfig = config.getKafkaSystemConsumerConfig(
-      checkpointSystemName,
-      clientId)
-
-    val zkConnectString = Option(consumerConfig.zkConnect)
-      .getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
-    () => {
-      new ZkClient(zkConnectString, 6000, 6000, ZKStringSerializer)
-    }
-  }
-
-  override def migrate(config: Config) {
-    val jobName = config.getName.getOrElse(throw new SamzaException("Cannot find job name. Cannot proceed with migration."))
-    val jobId = config.getJobId.getOrElse("1")
-
-    val checkpointTopicName = KafkaUtil.getCheckpointTopic(jobName, jobId)
-
-    val coordinatorSystemFactory = new CoordinatorStreamSystemFactory
-    val coordinatorSystemConsumer = coordinatorSystemFactory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
-    val coordinatorSystemProducer = coordinatorSystemFactory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
-
-    val checkpointManager = new KafkaCheckpointManagerFactory().getCheckpointManager(config, new NoOpMetricsRegistry).asInstanceOf[KafkaCheckpointManager]
-
-    val kafkaUtil = new KafkaUtil(new ExponentialSleepStrategy, getConnectZk(config))
-
-    // make sure to validate that we only perform migration when checkpoint topic exists
-    if (kafkaUtil.topicExists(checkpointTopicName)) {
-      kafkaUtil.validateTopicPartitionCount(
-        checkpointTopicName,
-        getCheckpointSystemName(config),
-        getTopicMetadataStore(config),
-        1,
-        config.failOnCheckpointValidation)
-
-      if (migrationVerification(coordinatorSystemConsumer)) {
-        info("Migration %s was already performed, doing nothing" format migrationKey)
-        return
-      }
-
-      info("No previous migration for %s were detected, performing migration" format migrationKey)
-
-      info("Loading changelog partition mapping from checkpoint topic - %s" format checkpointTopicName)
-      val changelogMap = checkpointManager.readChangeLogPartitionMapping()
-      checkpointManager.stop
-
-      info("Writing changelog to coordinator stream topic - %s" format Util.getCoordinatorStreamName(jobName, jobId))
-      val changelogPartitionManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, source)
-      changelogPartitionManager.start()
-      changelogPartitionManager.writeChangeLogPartitionMapping(changelogMap)
-      changelogPartitionManager.stop()
-    }
-    migrationCompletionMark(coordinatorSystemProducer)
-
-  }
-
-  def migrationVerification(coordinatorSystemConsumer : CoordinatorStreamSystemConsumer): Boolean = {
-    coordinatorSystemConsumer.register()
-    coordinatorSystemConsumer.start()
-    coordinatorSystemConsumer.bootstrap()
-    val stream = coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE)
-    val message = new SetMigrationMetaMessage(source, migrationKey, migrationVal)
-    stream.contains(message.asInstanceOf[CoordinatorStreamMessage])
-  }
-
-  def migrationCompletionMark(coordinatorSystemProducer: CoordinatorStreamSystemProducer) = {
-    info("Marking completion of migration %s" format migrationKey)
-    val message = new SetMigrationMetaMessage(source, migrationKey, migrationVal)
-    coordinatorSystemProducer.register(source)
-    coordinatorSystemProducer.start()
-    coordinatorSystemProducer.send(message)
-    coordinatorSystemProducer.stop()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74b0f840/samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala b/samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala
deleted file mode 100644
index 504fc89..0000000
--- a/samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.migration
-
-import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{TestUtils, TestZKUtils, Utils}
-import kafka.zk.EmbeddedZookeeper
-import org.I0Itec.zkclient.ZkClient
-import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord}
-import org.apache.samza.checkpoint.Checkpoint
-import org.apache.samza.checkpoint.kafka.{KafkaCheckpointManager, KafkaCheckpointLogKey, KafkaCheckpointManagerFactory}
-import org.apache.samza.config._
-import org.apache.samza.container.TaskName
-import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
-import org.apache.samza.coordinator.MockSystemFactory
-import org.apache.samza.coordinator.stream.messages.SetMigrationMetaMessage
-import org.apache.samza.coordinator.stream._
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.serializers.CheckpointSerde
-import org.apache.samza.storage.ChangelogPartitionManager
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util._
-import org.apache.samza.Partition
-import org.junit.Assert._
-import org.junit._
-
-import scala.collection.JavaConversions._
-import scala.collection._
-
-class TestKafkaCheckpointMigration {
-
-  val checkpointTopic = "checkpoint-topic"
-  val serdeCheckpointTopic = "checkpoint-topic-invalid-serde"
-  val checkpointTopicConfig = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(null)
-  val zkConnect: String = TestZKUtils.zookeeperConnect
-  var zkClient: ZkClient = null
-  val zkConnectionTimeout = 6000
-  val zkSessionTimeout = 6000
-
-  val brokerId1 = 0
-  val brokerId2 = 1
-  val brokerId3 = 2
-  val ports = TestUtils.choosePorts(3)
-  val (port1, port2, port3) = (ports(0), ports(1), ports(2))
-
-  val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
-  props1.put("controlled.shutdown.enable", "true")
-  val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
-  props1.put("controlled.shutdown.enable", "true")
-  val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
-  props1.put("controlled.shutdown.enable", "true")
-
-  val config = new java.util.HashMap[String, Object]()
-  val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
-  config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
-  config.put("acks", "all")
-  config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
-  config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(java.lang.Integer.MAX_VALUE-1)).toString)
-  config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
-  val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
-  val partition = new Partition(0)
-  val partition2 = new Partition(1)
-  val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "123"))
-  val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "12345"))
-  var zookeeper: EmbeddedZookeeper = null
-  var server1: KafkaServer = null
-  var server2: KafkaServer = null
-  var server3: KafkaServer = null
-  var metadataStore: TopicMetadataStore = null
-
-  val systemStreamPartitionGrouperFactoryString = classOf[GroupByPartitionFactory].getCanonicalName
-
-  @Before
-  def beforeSetupServers {
-    zookeeper = new EmbeddedZookeeper(zkConnect)
-    server1 = TestUtils.createServer(new KafkaConfig(props1))
-    server2 = TestUtils.createServer(new KafkaConfig(props2))
-    server3 = TestUtils.createServer(new KafkaConfig(props3))
-    metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
-  }
-
-  @After
-  def afterCleanLogDirs {
-    server1.shutdown
-    server1.awaitShutdown()
-    server2.shutdown
-    server2.awaitShutdown()
-    server3.shutdown
-    server3.awaitShutdown()
-    Utils.rm(server1.config.logDirs)
-    Utils.rm(server2.config.logDirs)
-    Utils.rm(server3.config.logDirs)
-    zookeeper.shutdown
-  }
-
-  private def writeChangeLogPartitionMapping(changelogMapping: Map[TaskName, Integer], cpTopic: String = checkpointTopic) = {
-    val producer: Producer[Array[Byte], Array[Byte]] = new KafkaProducer(producerConfig.getProducerProperties)
-    val record = new ProducerRecord(
-      cpTopic,
-      0,
-      KafkaCheckpointLogKey.getChangelogPartitionMappingKey().toBytes(),
-      new CheckpointSerde().changelogPartitionMappingToBytes(changelogMapping)
-    )
-    try {
-      producer.send(record).get()
-    } catch {
-      case e: Exception => println(e.getMessage)
-    } finally {
-      producer.close()
-    }
-  }
-
-  @Test
-  def testMigrationWithNoCheckpointTopic() {
-    val mapConfig = Map[String, String](
-      "task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
-      JobConfig.JOB_NAME -> "test",
-      JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
-      JobConfig.JOB_CONTAINER_COUNT -> "2",
-      "task.inputs" -> "test.stream1",
-      "task.checkpoint.system" -> "test",
-      SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
-      "systems.test.producer.bootstrap.servers" -> brokers,
-      "systems.test.consumer.zookeeper.connect" -> zkConnect,
-      SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName)
-
-    // Enable consumer caching
-    MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
-
-    val config: MapConfig = new MapConfig(mapConfig)
-    val migrate = new KafkaCheckpointMigration
-    migrate.migrate(config)
-    val consumer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemConsumer(config, new NoOpMetricsRegistry)
-    consumer.register()
-    consumer.start()
-    consumer.bootstrap()
-    val bootstrappedStream = consumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE)
-    assertEquals(1, bootstrappedStream.size())
-
-    val expectedMigrationMessage = new SetMigrationMetaMessage("CHECKPOINTMIGRATION", "CheckpointMigration09to10", "true")
-    assertEquals(expectedMigrationMessage, bootstrappedStream.head)
-    consumer.stop()
-  }
-
-  @Test
-  def testMigration() {
-    try {
-      val mapConfig = Map(
-        "task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
-        JobConfig.JOB_NAME -> "test",
-        JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
-        JobConfig.JOB_CONTAINER_COUNT -> "2",
-        "task.inputs" -> "test.stream1",
-        "task.checkpoint.system" -> "test",
-        "systems.test.producer.bootstrap.servers" -> brokers,
-        "systems.test.consumer.zookeeper.connect" -> zkConnect,
-        SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
-        SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName)
-      // Enable consumer caching
-      MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
-
-      val config = new MapConfig(mapConfig)
-      val checkpointTopicName = KafkaUtil.getCheckpointTopic("test", "1")
-      val checkpointManager = new KafkaCheckpointManagerFactory().getCheckpointManager(config, new NoOpMetricsRegistry).asInstanceOf[KafkaCheckpointManager]
-
-      // Write a couple of checkpoints in the old checkpoint topic
-      val task1 = new TaskName(partition.toString)
-      val task2 = new TaskName(partition2.toString)
-      checkpointManager.start
-      checkpointManager.register(task1)
-      checkpointManager.register(task2)
-      checkpointManager.writeCheckpoint(task1, cp1)
-      checkpointManager.writeCheckpoint(task2, cp2)
-
-      // Write changelog partition info to the old checkpoint topic
-      val changelogMapping = Map(task1 -> 1.asInstanceOf[Integer], task2 -> 10.asInstanceOf[Integer])
-      writeChangeLogPartitionMapping(changelogMapping, checkpointTopicName)
-      checkpointManager.stop
-
-      // Initialize coordinator stream
-      val coordinatorFactory = new CoordinatorStreamSystemFactory()
-      val coordinatorSystemConsumer = coordinatorFactory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
-      val coordinatorSystemProducer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
-      coordinatorSystemConsumer.register()
-      coordinatorSystemConsumer.start()
-
-      assertEquals(coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE).size, 0)
-      coordinatorSystemConsumer.stop
-
-      // Start the migration
-      val migrationInstance = new KafkaCheckpointMigration
-      migrationInstance.migrate(config)
-
-      // Verify if the changelogPartitionInfo has been migrated
-      val newChangelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, "test")
-      newChangelogManager.start
-      val newChangelogMapping = newChangelogManager.readChangeLogPartitionMapping()
-      newChangelogManager.stop
-      assertEquals(newChangelogMapping.toMap, changelogMapping)
-
-      // Check for migration message
-      coordinatorSystemConsumer.register()
-      coordinatorSystemConsumer.start()
-      assertEquals(coordinatorSystemConsumer.getBootstrappedStream(SetMigrationMetaMessage.TYPE).size, 1)
-      coordinatorSystemConsumer.stop()
-    }
-    finally {
-      MockCoordinatorStreamSystemFactory.disableMockConsumerCache()
-    }
-  }
-
-  class MockKafkaCheckpointMigration extends KafkaCheckpointMigration{
-    var migrationCompletionMarkFlag: Boolean = false
-    var migrationVerificationMarkFlag: Boolean = false
-
-    override def migrationCompletionMark(coordinatorStreamProducer: CoordinatorStreamSystemProducer) = {
-      migrationCompletionMarkFlag = true
-      super.migrationCompletionMark(coordinatorStreamProducer)
-    }
-
-    override def migrationVerification(coordinatorStreamConsumer: CoordinatorStreamSystemConsumer): Boolean = {
-      migrationVerificationMarkFlag = true
-      super.migrationVerification(coordinatorStreamConsumer)
-    }
-  }
-}


[3/3] samza git commit: SAMZA-981 - Set consistent Kafka clientId for a job instance

Posted by na...@apache.org.
SAMZA-981 - Set consistent Kafka clientId for a job instance


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/944dd02e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/944dd02e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/944dd02e

Branch: refs/heads/master
Commit: 944dd02e1d00bcce59f1fcc33ecbb2a8acd95870
Parents: b3d5c4c
Author: Xinyu Liu <xi...@linkedin.com>
Authored: Fri Jul 29 13:11:36 2016 -0700
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Fri Jul 29 13:11:36 2016 -0700

----------------------------------------------------------------------
 .../org/apache/samza/config/KafkaConfig.scala   |  4 +-
 .../samza/config/RegExTopicGenerator.scala      |  2 +-
 .../system/kafka/KafkaSystemConsumer.scala      |  2 +-
 .../scala/org/apache/samza/util/KafkaUtil.scala |  6 +--
 .../apache/samza/config/TestKafkaConfig.scala   | 44 +++++++++-----------
 .../system/kafka/TestKafkaSystemConsumer.scala  | 13 +++---
 6 files changed, 33 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/944dd02e/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 1822511..973ab8c 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -148,7 +148,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
   // kafka config
   def getKafkaSystemConsumerConfig(
     systemName: String,
-    clientId: String = "undefined-samza-consumer-%s" format UUID.randomUUID.toString,
+    clientId: String,
     groupId: String = "undefined-samza-consumer-group-%s" format UUID.randomUUID.toString,
     injectedProps: Map[String, String] = Map()) = {
 
@@ -163,7 +163,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
 
   def getKafkaSystemProducerConfig(
     systemName: String,
-    clientId: String = "undefined-samza-producer-%s" format UUID.randomUUID.toString,
+    clientId: String,
     injectedProps: Map[String, String] = Map()) = {
 
     val subConf = config.subset("systems.%s.producer." format systemName, true)

http://git-wip-us.apache.org/repos/asf/samza/blob/944dd02e/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
index 78467bf..bcbad27 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
@@ -99,7 +99,7 @@ class RegExTopicGenerator extends ConfigRewriter with Logging {
     val systemName = config
       .getRegexResolvedSystem(rewriterName)
       .getOrElse(throw new SamzaException("No system defined in config for rewriter %s." format rewriterName))
-    val consumerConfig = config.getKafkaSystemConsumerConfig(systemName)
+    val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, "")
     val zkConnect = Option(consumerConfig.zkConnect)
       .getOrElse(throw new SamzaException("No zookeeper.connect for system %s defined in config." format systemName))
     val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)

http://git-wip-us.apache.org/repos/asf/samza/blob/944dd02e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index b373753..36567b6 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -64,7 +64,7 @@ private[kafka] class KafkaSystemConsumer(
   systemAdmin: SystemAdmin,
   metrics: KafkaSystemConsumerMetrics,
   metadataStore: TopicMetadataStore,
-  clientId: String = "undefined-client-id-%s" format UUID.randomUUID.toString,
+  clientId: String,
   timeout: Int = ConsumerConfig.ConsumerTimeoutMs,
   bufferSize: Int = ConsumerConfig.SocketBufferSize,
   fetchSize: StreamFetchSizes = new StreamFetchSizes,

http://git-wip-us.apache.org/repos/asf/samza/blob/944dd02e/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index a25ba62..e95f052 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -45,12 +45,10 @@ object KafkaUtil extends Logging {
     config.getJobId.getOrElse("1"))
 
   def getClientId(id: String, jobName: String, jobId: String): String =
-    "%s-%s-%s-%s-%s" format
+    "%s-%s-%s" format
       (id.replaceAll("[^A-Za-z0-9]", "_"),
         jobName.replaceAll("[^A-Za-z0-9]", "_"),
-        jobId.replaceAll("[^A-Za-z0-9]", "_"),
-        System.currentTimeMillis,
-        counter.getAndIncrement)
+        jobId.replaceAll("[^A-Za-z0-9]", "_"))
 
   private def abs(n: Int) = if (n == Integer.MIN_VALUE) 0 else math.abs(n)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/944dd02e/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index c4a83f6..9c9c71e 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -37,6 +37,8 @@ class TestKafkaConfig {
   var props : Properties = new Properties
   val SYSTEM_NAME = "kafka";
   val KAFKA_PRODUCER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".producer."
+  val TEST_CLIENT_ID = "TestClientId"
+  val TEST_GROUP_ID = "TestGroupId"
   
   @Before
   def setupProperties() {
@@ -53,52 +55,46 @@ class TestKafkaConfig {
     val mapConfig = new MapConfig(props.toMap[String, String])
     val kafkaConfig = new KafkaConfig(mapConfig)
 
-    val consumerConfig1 = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME)
+    val consumerConfig1 = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME, "TestClientId1")
     val consumerClientId1 = consumerConfig1.clientId
     val groupId1 = consumerConfig1.groupId
-    val consumerConfig2 = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME)
+    val consumerConfig2 = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME, "TestClientId2")
     val consumerClientId2 = consumerConfig2.clientId
     val groupId2 = consumerConfig2.groupId
-    assert(consumerClientId1.startsWith("undefined-samza-consumer-"))
-    assert(consumerClientId2.startsWith("undefined-samza-consumer-"))
+    assert(consumerClientId1.equals("TestClientId1"))
+    assert(consumerClientId2.equals("TestClientId2"))
     assert(groupId1.startsWith("undefined-samza-consumer-group-"))
     assert(groupId2.startsWith("undefined-samza-consumer-group-"))
     assert(consumerClientId1 != consumerClientId2)
     assert(groupId1 != groupId2)
 
-    val consumerConfig3 = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME, "TestClientId", "TestGroupId")
+    val consumerConfig3 = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME, TEST_CLIENT_ID, TEST_GROUP_ID)
     val consumerClientId3 = consumerConfig3.clientId
     val groupId3 = consumerConfig3.groupId
-    assert(consumerClientId3 == "TestClientId")
-    assert(groupId3 == "TestGroupId")
+    assert(consumerClientId3.equals(TEST_CLIENT_ID))
+    assert(groupId3.equals(TEST_GROUP_ID))
 
-    val producerConfig1 = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME)
+    val producerConfig1 = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, "TestClientId1")
     val producerClientId1 = producerConfig1.clientId
-    val producerConfig2 = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME)
+    val producerConfig2 = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, "TestClientId2")
     val producerClientId2 = producerConfig2.clientId
 
-    assert(producerClientId1.startsWith("undefined-samza-producer-"))
-    assert(producerClientId2.startsWith("undefined-samza-producer-"))
-    assert(producerClientId1 != producerClientId2)
-
-    val producerConfig3 = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, "TestClientId")
-    val producerClientId3 = producerConfig3.clientId
-    assert(producerClientId3 == "TestClientId")
-
+    assert(producerClientId1.equals("TestClientId1"))
+    assert(producerClientId2.equals("TestClientId2"))
   }
 
   @Test
   def testStreamLevelFetchSizeOverride() {
     val mapConfig = new MapConfig(props.toMap[String, String])
     val kafkaConfig = new KafkaConfig(mapConfig)
-    val consumerConfig = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME)
+    val consumerConfig = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     // default fetch size
     assertEquals(1024*1024, consumerConfig.fetchMessageMaxBytes)
 
     props.setProperty("systems." + SYSTEM_NAME + ".consumer.fetch.message.max.bytes", "262144")
     val mapConfig1 = new MapConfig(props.toMap[String, String])
     val kafkaConfig1 = new KafkaConfig(mapConfig1)
-    val consumerConfig1 = kafkaConfig1.getKafkaSystemConsumerConfig(SYSTEM_NAME)
+    val consumerConfig1 = kafkaConfig1.getKafkaSystemConsumerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     // shared fetch size
     assertEquals(512*512, consumerConfig1.fetchMessageMaxBytes)
     
@@ -140,7 +136,7 @@ class TestKafkaConfig {
   def testDefaultValuesForProducerProperties() {
     val mapConfig = new MapConfig(props.toMap[String, String])
     val kafkaConfig = new KafkaConfig(mapConfig)
-    val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME)
+    val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     val producerProperties = kafkaProducerConfig.getProducerProperties
     
     assertEquals(classOf[ByteArraySerializer].getCanonicalName, producerProperties.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
@@ -157,7 +153,7 @@ class TestKafkaConfig {
     
     val mapConfig = new MapConfig(props.toMap[String, String])
     val kafkaConfig = new KafkaConfig(mapConfig)
-    val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME)
+    val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     val producerProperties = kafkaProducerConfig.getProducerProperties
     
     assertEquals(expectedValue, producerProperties.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION))
@@ -171,7 +167,7 @@ class TestKafkaConfig {
     
     val mapConfig = new MapConfig(props.toMap[String, String])
     val kafkaConfig = new KafkaConfig(mapConfig)
-    val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME)
+    val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     val producerProperties = kafkaProducerConfig.getProducerProperties
     
     assertEquals(expectedValue, producerProperties.get(ProducerConfig.RETRIES_CONFIG))
@@ -183,7 +179,7 @@ class TestKafkaConfig {
     
     val mapConfig = new MapConfig(props.toMap[String, String])
     val kafkaConfig = new KafkaConfig(mapConfig)
-    val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME)
+    val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     kafkaProducerConfig.getProducerProperties
   }
   
@@ -193,7 +189,7 @@ class TestKafkaConfig {
     
     val mapConfig = new MapConfig(props.toMap[String, String])
     val kafkaConfig = new KafkaConfig(mapConfig)
-    val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME)
+    val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     kafkaProducerConfig.getProducerProperties
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/944dd02e/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
index ece0359..3b3ed3d 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
@@ -41,11 +41,12 @@ class TestKafkaSystemConsumer {
   private val SSP: SystemStreamPartition = new SystemStreamPartition("test", "test", new Partition(0))
   private val envelope: IncomingMessageEnvelope = new IncomingMessageEnvelope(SSP, null, null, null)
   private val envelopeWithSize: IncomingMessageEnvelope = new IncomingMessageEnvelope(SSP, null, null, null, 100)
+  private val clientId = "TestClientId"
 
   @Test
   def testFetchThresholdShouldDivideEvenlyAmongPartitions {
     val metadataStore = new MockMetadataStore
-    val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, fetchThreshold = 50000) {
+    val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, fetchThreshold = 50000) {
       override def refreshBrokers {
       }
     }
@@ -70,7 +71,7 @@ class TestKafkaSystemConsumer {
     val metadataStore = new MockMetadataStore(Map(streamName -> TopicMetadata(streamName, Seq.empty, 0)))
     var hosts = List[String]()
     var getHostPortCount = 0
-    val consumer = new KafkaSystemConsumer(systemName, systemAdmin, metrics, metadataStore) {
+    val consumer = new KafkaSystemConsumer(systemName, systemAdmin, metrics, metadataStore, clientId) {
       override def getHostPort(topicMetadata: TopicMetadata, partition: Int): Option[(String, Int)] = {
         // Generate a unique host every time getHostPort is called.
         getHostPortCount += 1
@@ -106,7 +107,7 @@ class TestKafkaSystemConsumer {
     when(systemAdmin.offsetComparator(anyString, anyString)).thenCallRealMethod()
 
     val metadataStore = new MockMetadataStore
-    val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, fetchThreshold = 50000)
+    val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, fetchThreshold = 50000)
     val ssp0 = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
     val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(1))
     val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(2))
@@ -125,7 +126,7 @@ class TestKafkaSystemConsumer {
   @Test
   def testFetchThresholdBytesShouldDivideEvenlyAmongPartitions {
     val metadataStore = new MockMetadataStore
-    val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore,
+    val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId,
       fetchThreshold = 50000, fetchThresholdBytes = 60000L, fetchLimitByBytesEnabled = true) {
       override def refreshBrokers {
       }
@@ -144,7 +145,7 @@ class TestKafkaSystemConsumer {
   @Test
   def testFetchThresholdBytes {
     val metadataStore = new MockMetadataStore
-    val consumer = new KafkaSystemConsumer("test-system", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore,
+    val consumer = new KafkaSystemConsumer("test-system", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId,
       fetchThreshold = 50000, fetchThresholdBytes = 60000L, fetchLimitByBytesEnabled = true) {
       override def refreshBrokers {
       }
@@ -168,7 +169,7 @@ class TestKafkaSystemConsumer {
   @Test
   def testFetchThresholdBytesDisabled {
     val metadataStore = new MockMetadataStore
-    val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore,
+    val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId,
       fetchThreshold = 50000, fetchThresholdBytes = 60000L) {
       override def refreshBrokers {
       }


[2/3] samza git commit: SAMZA-982 - Add null check for offset update in OffsetManager

Posted by na...@apache.org.
SAMZA-982 - Add null check for offset update in OffsetManager


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b3d5c4cc
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b3d5c4cc
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b3d5c4cc

Branch: refs/heads/master
Commit: b3d5c4cc0d2babedc48015aebbbec6fb99d2a1ce
Parents: 74b0f84
Author: Xinyu Liu <xi...@linkedin.com>
Authored: Fri Jul 29 13:10:00 2016 -0700
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Fri Jul 29 13:10:43 2016 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/samza/checkpoint/OffsetManager.scala   | 4 +++-
 .../scala/org/apache/samza/checkpoint/TestOffsetManager.scala    | 2 ++
 2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/b3d5c4cc/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index f8033c5..c41eadb 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -186,7 +186,9 @@ class OffsetManager(
    */
   def update(taskName: TaskName, systemStreamPartition: SystemStreamPartition, offset: String) {
     lastProcessedOffsets.putIfAbsent(taskName, new ConcurrentHashMap[SystemStreamPartition, String]())
-    lastProcessedOffsets.get(taskName).put(systemStreamPartition, offset)
+    if (offset != null) {
+      lastProcessedOffsets.get(taskName).put(systemStreamPartition, offset)
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/b3d5c4cc/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index 75ba8af..cb78223 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -81,6 +81,8 @@ class TestOffsetManager {
     assertEquals("47", offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).get)
     // Should never update starting offset.
     assertEquals("46", offsetManager.getStartingOffset(taskName, systemStreamPartition).get)
+    // Should not update null offset
+    offsetManager.update(taskName, systemStreamPartition, null)
     offsetManager.checkpoint(taskName)
     val expectedCheckpoint = new Checkpoint(Map(systemStreamPartition -> "47"))
     assertEquals(expectedCheckpoint, checkpointManager.readLastCheckpoint(taskName))