You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2019/05/22 18:58:40 UTC

[GitHub] [samza] shanthoosh commented on a change in pull request #1043: SAMZA-2202: Create log compact topic with larger message size

shanthoosh commented on a change in pull request #1043: SAMZA-2202: Create log compact topic with larger message size
URL: https://github.com/apache/samza/pull/1043#discussion_r286606260
 
 

 ##########
 File path: samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
 ##########
 @@ -247,97 +252,115 @@ class TestKafkaConfig {
   }
 
   @Test
-  def testCheckpointReplicationFactor() {
+  def testCheckpointConfigs() {
     val emptyConfig = new KafkaConfig(new MapConfig())
     assertEquals("3", emptyConfig.getCheckpointReplicationFactor.orNull)
+    assertEquals(1000012, emptyConfig.getCheckpointMaxMessageBytes())
     assertNull(emptyConfig.getCheckpointSystem.orNull)
 
     props.setProperty(KafkaConfig.CHECKPOINT_SYSTEM, "kafka-system")
     props.setProperty("task.checkpoint.replication.factor", "4")
+    props.setProperty("task.checkpoint.max.message.bytes", "2048000")
 
     val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     assertEquals("kafka-system", kafkaConfig.getCheckpointSystem.orNull)
     assertEquals("4", kafkaConfig.getCheckpointReplicationFactor.orNull)
+    assertEquals(2048000, kafkaConfig.getCheckpointMaxMessageBytes())
   }
 
   @Test
-  def testCheckpointReplicationFactorWithSystemDefault() {
+  def testCheckpointConfigsWithSystemDefault() {
     props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "other-kafka-system")
     props.setProperty("systems.other-kafka-system.default.stream.replication.factor", "8")
     props.setProperty("systems.other-kafka-system.default.stream.segment.bytes", "8675309")
+    props.setProperty("systems.other-kafka-system.default.stream.max.message.bytes", "4096000")
 
     val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     assertEquals("other-kafka-system", kafkaConfig.getCheckpointSystem.orNull)
     assertEquals("8", kafkaConfig.getCheckpointReplicationFactor.orNull)
     assertEquals(8675309, kafkaConfig.getCheckpointSegmentBytes)
+    assertEquals(4096000, kafkaConfig.getCheckpointMaxMessageBytes())
   }
 
   @Test
-  def testCheckpointReplicationFactorWithSystemOverriddenDefault() {
+  def testCheckpointConfigsWithSystemOverriddenDefault() {
     // Defaults
     props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "other-kafka-system")
     props.setProperty("systems.other-kafka-system.default.stream.replication.factor", "8")
+    props.setProperty("systems.other-kafka-system.default.stream.max.message.bytes", "4096000")
     props.setProperty("systems.kafka-system.default.stream.segment.bytes", "8675309")
 
     // Overrides
     props.setProperty(KafkaConfig.CHECKPOINT_SYSTEM, "kafka-system")
     props.setProperty(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR, "4")
+    props.setProperty(KafkaConfig.CHECKPOINT_MAX_MESSAGE_BYTES, "8192000")
 
     val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     assertEquals("kafka-system", kafkaConfig.getCheckpointSystem.orNull)
     assertEquals("4", kafkaConfig.getCheckpointReplicationFactor.orNull)
     assertEquals(8675309, kafkaConfig.getCheckpointSegmentBytes)
+    assertEquals(8192000, kafkaConfig.getCheckpointMaxMessageBytes())
   }
 
   @Test
-  def testCoordinatorReplicationFactor() {
+  def testCoordinatorConfigs() {
     val emptyConfig = new KafkaConfig(new MapConfig())
     assertEquals("3", emptyConfig.getCoordinatorReplicationFactor)
+    assertEquals("1000012", emptyConfig.getCoordinatorMaxMessageByte)
     assertNull(new JobConfig(new MapConfig()).getCoordinatorSystemNameOrNull)
 
     props.setProperty(JobConfig.JOB_COORDINATOR_SYSTEM, "kafka-system")
     props.setProperty(KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR, "4")
+    props.setProperty(KafkaConfig.JOB_COORDINATOR_MAX_MESSAGE_BYTES, "1024000")
 
     val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     val jobConfig = new JobConfig(mapConfig)
     assertEquals("kafka-system", jobConfig.getCoordinatorSystemNameOrNull)
     assertEquals("4", kafkaConfig.getCoordinatorReplicationFactor)
+    assertEquals("1024000", kafkaConfig.getCoordinatorMaxMessageByte)
   }
 
   @Test
-  def testCoordinatorReplicationFactorWithSystemDefault() {
+  def testCoordinatorConfigsWithSystemDefault() {
     props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "other-kafka-system")
     props.setProperty("systems.other-kafka-system.default.stream.replication.factor", "8")
     props.setProperty("systems.other-kafka-system.default.stream.segment.bytes", "8675309")
+    props.setProperty("systems.other-kafka-system.default.stream.max.message.bytes", "2048000")
 
     val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     val jobConfig = new JobConfig(mapConfig)
     assertEquals("other-kafka-system", jobConfig.getCoordinatorSystemNameOrNull)
     assertEquals("8", kafkaConfig.getCoordinatorReplicationFactor)
     assertEquals("8675309", kafkaConfig.getCoordinatorSegmentBytes)
+    assertEquals("2048000", kafkaConfig.getCoordinatorMaxMessageByte)
   }
 
   @Test
-  def testCoordinatorReplicationFactorWithSystemOverriddenDefault() {
+  def testCoordinatorConfigsWithSystemOverriddenDefault() {
     // Defaults
     props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "other-kafka-system")
     props.setProperty("systems.other-kafka-system.default.stream.replication.factor", "8")
+    props.setProperty("systems.other-kafka-system.default.stream.max.message.byte", "2048000")
     props.setProperty("systems.kafka-system.default.stream.segment.bytes", "8675309")
 
     // Overrides
     props.setProperty(JobConfig.JOB_COORDINATOR_SYSTEM, "kafka-system")
     props.setProperty(KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR, "4")
+    props.setProperty(KafkaConfig.JOB_COORDINATOR_MAX_MESSAGE_BYTES, "4096000")
 
     val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     val jobConfig = new JobConfig(mapConfig)
     assertEquals("kafka-system", jobConfig.getCoordinatorSystemNameOrNull)
     assertEquals("4", kafkaConfig.getCoordinatorReplicationFactor)
+    assertEquals("4096000", kafkaConfig.getCoordinatorMaxMessageByte)
     assertEquals("8675309", kafkaConfig.getCoordinatorSegmentBytes)
   }
+
 
 Review comment:
   Minor: Please remove the empty lines. 
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services