You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2020/03/10 00:13:30 UTC

[samza] branch master updated: SAMZA-2317: ProcessJob does not call CoordinatorStreamStore.close() (#1289)

This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 0dafc28  SAMZA-2317: ProcessJob does not call CoordinatorStreamStore.close() (#1289)
0dafc28 is described below

commit 0dafc283bf3b7db0432d7350c8880754951cc8f0
Author: PanTheMan <er...@gmail.com>
AuthorDate: Mon Mar 9 17:13:23 2020 -0700

    SAMZA-2317: ProcessJob does not call CoordinatorStreamStore.close() (#1289)
---
 .../metadatastore/CoordinatorStreamStore.java      |  2 +-
 .../org/apache/samza/job/local/ProcessJob.scala    |  7 +++-
 .../apache/samza/job/local/ProcessJobFactory.scala |  2 +-
 .../apache/samza/job/local/TestProcessJob.scala    | 44 +++++++++++++++++++++-
 4 files changed, 51 insertions(+), 4 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
index 24ce457..5af0b31 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
@@ -94,7 +94,7 @@ public class CoordinatorStreamStore implements MetadataStore {
   }
 
   @VisibleForTesting
-  CoordinatorStreamStore(Config config, SystemProducer systemProducer, SystemConsumer systemConsumer, SystemAdmin systemAdmin) {
+  protected CoordinatorStreamStore(Config config, SystemProducer systemProducer, SystemConsumer systemConsumer, SystemAdmin systemAdmin) {
     this.config = config;
     this.systemConsumer = systemConsumer;
     this.systemProducer = systemProducer;
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
index f719220..9e02852 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
@@ -22,6 +22,7 @@ package org.apache.samza.job.local
 import java.util.concurrent.CountDownLatch
 
 import org.apache.samza.coordinator.JobModelManager
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore
 import org.apache.samza.job.ApplicationStatus.{New, Running, SuccessfulFinish, UnsuccessfulFinish}
 import org.apache.samza.job.{ApplicationStatus, CommandBuilder, StreamJob}
 import org.apache.samza.util.Logging
@@ -41,7 +42,10 @@ object ProcessJob {
   }
 }
 
-class ProcessJob(commandBuilder: CommandBuilder, val jobModelManager: JobModelManager) extends StreamJob with Logging {
+class ProcessJob(
+  commandBuilder: CommandBuilder,
+  val jobModelManager: JobModelManager,
+  val coordinatorStreamStore: CoordinatorStreamStore) extends StreamJob with Logging {
 
   import ProcessJob._
 
@@ -72,6 +76,7 @@ class ProcessJob(commandBuilder: CommandBuilder, val jobModelManager: JobModelMa
           case e: Exception => error("Encountered an error during job start: %s".format(e.getMessage))
         } finally {
           jobModelManager.stop
+          coordinatorStreamStore.close
           setStatus(if (processExitCode == 0) SuccessfulFinish else UnsuccessfulFinish)
         }
       }
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index ca82892..fab8c6e 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -113,6 +113,6 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
       .setId("0")
       .setUrl(jobModelManager.server.getUrl)
 
-    new ProcessJob(commandBuilder, jobModelManager)
+    new ProcessJob(commandBuilder, jobModelManager, coordinatorStreamStore)
   }
 }
diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
index dc87583..d4f4d41 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
@@ -19,9 +19,15 @@
 
 package org.apache.samza.job.local
 
+import com.google.common.collect.ImmutableMap
+import org.apache.samza.config.MapConfig
 import org.apache.samza.coordinator.JobModelManager
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
 import org.apache.samza.job.ApplicationStatus.{Running, SuccessfulFinish, UnsuccessfulFinish}
 import org.apache.samza.job.CommandBuilder
+import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemProducer}
+import org.apache.samza.util.NoOpMetricsRegistry
 import org.junit.Assert._
 import org.junit.Test
 
@@ -34,6 +40,8 @@ object TestProcessJob {
   val SimpleCommand = "true"
   val FailingCommand = "false"
   val BadCommand = "bad-non-existing-command"
+  val MockSystemName = "test-kafka"
+  val MockConfigs = new MapConfig(ImmutableMap.of("job.name", "test-job", "job.coordinator.system", MockSystemName))
 
   private def createProcessJob(command: String): ProcessJob = {
     val commandBuilder = new CommandBuilder {
@@ -41,12 +49,26 @@ object TestProcessJob {
 
       override def buildEnvironment = Map[String, String]().asJava
     }
-    new ProcessJob(commandBuilder, new MockJobModelManager)
+    // Setup for mocking a CoordinateStreamStore
+    val systemFactory = new MockCoordinatorStreamSystemFactory
+    MockCoordinatorStreamSystemFactory.enableMockConsumerCache
+    val systemConsumer = systemFactory.getConsumer(MockSystemName, MockConfigs, new NoOpMetricsRegistry)
+    val systemProducer = systemFactory.getProducer(MockSystemName, MockConfigs, new NoOpMetricsRegistry)
+    val systemAdmin = systemFactory.getAdmin(MockSystemName, MockConfigs)
+
+    new ProcessJob(
+      commandBuilder,
+      new MockJobModelManager,
+      new MockCoordinateStreamStore(MockConfigs, systemProducer, systemConsumer, systemAdmin))
   }
 
   private def getMockJobModelManager(processJob: ProcessJob): MockJobModelManager = {
     processJob.jobModelManager.asInstanceOf[MockJobModelManager]
   }
+
+  private def getMockCoordinatorStreamStore(processJob: ProcessJob): MockCoordinateStreamStore = {
+    processJob.coordinatorStreamStore.asInstanceOf[MockCoordinateStreamStore]
+  }
 }
 
 class TestProcessJob {
@@ -61,6 +83,7 @@ class TestProcessJob {
 
     assertEquals(SuccessfulFinish, status)
     assertTrue(getMockJobModelManager(processJob).stopped)
+    assertTrue(getMockCoordinatorStreamStore(processJob).closed)
   }
 
   @Test
@@ -71,6 +94,7 @@ class TestProcessJob {
 
     assertEquals(UnsuccessfulFinish, status)
     assertTrue(getMockJobModelManager(processJob).stopped)
+    assertTrue(getMockCoordinatorStreamStore(processJob).closed)
   }
 
   @Test
@@ -91,6 +115,7 @@ class TestProcessJob {
 
     assertEquals(UnsuccessfulFinish, processJob.getStatus)
     assertTrue(getMockJobModelManager(processJob).stopped)
+    assertTrue(getMockCoordinatorStreamStore(processJob).closed)
   }
 
   @Test
@@ -101,6 +126,7 @@ class TestProcessJob {
 
     assertEquals(UnsuccessfulFinish, processJob.getStatus)
     assertTrue(getMockJobModelManager(processJob).stopped)
+    assertTrue(getMockCoordinatorStreamStore(processJob).closed)
   }
 
   @Test
@@ -111,6 +137,7 @@ class TestProcessJob {
 
     assertEquals(SuccessfulFinish, processJob.getStatus)
     assertTrue(getMockJobModelManager(processJob).stopped)
+    assertTrue(getMockCoordinatorStreamStore(processJob).closed)
   }
 
   @Test
@@ -145,3 +172,18 @@ class MockJobModelManager extends JobModelManager(null, null) {
     stopped = true
   }
 }
+
+class MockCoordinateStreamStore(
+  config: MapConfig,
+  systemProducer: SystemProducer,
+  systemConsumer: SystemConsumer,
+  systemAdmin: SystemAdmin)
+  extends CoordinatorStreamStore(config, systemProducer, systemConsumer, systemAdmin) {
+  var closed: Boolean = false
+
+  override def init: Unit = {}
+
+  override def close: Unit = {
+    closed = true
+  }
+}
\ No newline at end of file