You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2020/03/10 00:13:30 UTC
[samza] branch master updated: SAMZA-2317: ProcessJob does not call
CoordinatorStreamStore.close() (#1289)
This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 0dafc28 SAMZA-2317: ProcessJob does not call CoordinatorStreamStore.close() (#1289)
0dafc28 is described below
commit 0dafc283bf3b7db0432d7350c8880754951cc8f0
Author: PanTheMan <er...@gmail.com>
AuthorDate: Mon Mar 9 17:13:23 2020 -0700
SAMZA-2317: ProcessJob does not call CoordinatorStreamStore.close() (#1289)
---
.../metadatastore/CoordinatorStreamStore.java | 2 +-
.../org/apache/samza/job/local/ProcessJob.scala | 7 +++-
.../apache/samza/job/local/ProcessJobFactory.scala | 2 +-
.../apache/samza/job/local/TestProcessJob.scala | 44 +++++++++++++++++++++-
4 files changed, 51 insertions(+), 4 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
index 24ce457..5af0b31 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
@@ -94,7 +94,7 @@ public class CoordinatorStreamStore implements MetadataStore {
}
@VisibleForTesting
- CoordinatorStreamStore(Config config, SystemProducer systemProducer, SystemConsumer systemConsumer, SystemAdmin systemAdmin) {
+ protected CoordinatorStreamStore(Config config, SystemProducer systemProducer, SystemConsumer systemConsumer, SystemAdmin systemAdmin) {
this.config = config;
this.systemConsumer = systemConsumer;
this.systemProducer = systemProducer;
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
index f719220..9e02852 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
@@ -22,6 +22,7 @@ package org.apache.samza.job.local
import java.util.concurrent.CountDownLatch
import org.apache.samza.coordinator.JobModelManager
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore
import org.apache.samza.job.ApplicationStatus.{New, Running, SuccessfulFinish, UnsuccessfulFinish}
import org.apache.samza.job.{ApplicationStatus, CommandBuilder, StreamJob}
import org.apache.samza.util.Logging
@@ -41,7 +42,10 @@ object ProcessJob {
}
}
-class ProcessJob(commandBuilder: CommandBuilder, val jobModelManager: JobModelManager) extends StreamJob with Logging {
+class ProcessJob(
+ commandBuilder: CommandBuilder,
+ val jobModelManager: JobModelManager,
+ val coordinatorStreamStore: CoordinatorStreamStore) extends StreamJob with Logging {
import ProcessJob._
@@ -72,6 +76,7 @@ class ProcessJob(commandBuilder: CommandBuilder, val jobModelManager: JobModelMa
case e: Exception => error("Encountered an error during job start: %s".format(e.getMessage))
} finally {
jobModelManager.stop
+ coordinatorStreamStore.close
setStatus(if (processExitCode == 0) SuccessfulFinish else UnsuccessfulFinish)
}
}
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index ca82892..fab8c6e 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -113,6 +113,6 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
.setId("0")
.setUrl(jobModelManager.server.getUrl)
- new ProcessJob(commandBuilder, jobModelManager)
+ new ProcessJob(commandBuilder, jobModelManager, coordinatorStreamStore)
}
}
diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
index dc87583..d4f4d41 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
@@ -19,9 +19,15 @@
package org.apache.samza.job.local
+import com.google.common.collect.ImmutableMap
+import org.apache.samza.config.MapConfig
import org.apache.samza.coordinator.JobModelManager
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
import org.apache.samza.job.ApplicationStatus.{Running, SuccessfulFinish, UnsuccessfulFinish}
import org.apache.samza.job.CommandBuilder
+import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemProducer}
+import org.apache.samza.util.NoOpMetricsRegistry
import org.junit.Assert._
import org.junit.Test
@@ -34,6 +40,8 @@ object TestProcessJob {
val SimpleCommand = "true"
val FailingCommand = "false"
val BadCommand = "bad-non-existing-command"
+ val MockSystemName = "test-kafka"
+ val MockConfigs = new MapConfig(ImmutableMap.of("job.name", "test-job", "job.coordinator.system", MockSystemName))
private def createProcessJob(command: String): ProcessJob = {
val commandBuilder = new CommandBuilder {
@@ -41,12 +49,26 @@ object TestProcessJob {
override def buildEnvironment = Map[String, String]().asJava
}
- new ProcessJob(commandBuilder, new MockJobModelManager)
+ // Setup for mocking a CoordinateStreamStore
+ val systemFactory = new MockCoordinatorStreamSystemFactory
+ MockCoordinatorStreamSystemFactory.enableMockConsumerCache
+ val systemConsumer = systemFactory.getConsumer(MockSystemName, MockConfigs, new NoOpMetricsRegistry)
+ val systemProducer = systemFactory.getProducer(MockSystemName, MockConfigs, new NoOpMetricsRegistry)
+ val systemAdmin = systemFactory.getAdmin(MockSystemName, MockConfigs)
+
+ new ProcessJob(
+ commandBuilder,
+ new MockJobModelManager,
+ new MockCoordinateStreamStore(MockConfigs, systemProducer, systemConsumer, systemAdmin))
}
private def getMockJobModelManager(processJob: ProcessJob): MockJobModelManager = {
processJob.jobModelManager.asInstanceOf[MockJobModelManager]
}
+
+ private def getMockCoordinatorStreamStore(processJob: ProcessJob): MockCoordinateStreamStore = {
+ processJob.coordinatorStreamStore.asInstanceOf[MockCoordinateStreamStore]
+ }
}
class TestProcessJob {
@@ -61,6 +83,7 @@ class TestProcessJob {
assertEquals(SuccessfulFinish, status)
assertTrue(getMockJobModelManager(processJob).stopped)
+ assertTrue(getMockCoordinatorStreamStore(processJob).closed)
}
@Test
@@ -71,6 +94,7 @@ class TestProcessJob {
assertEquals(UnsuccessfulFinish, status)
assertTrue(getMockJobModelManager(processJob).stopped)
+ assertTrue(getMockCoordinatorStreamStore(processJob).closed)
}
@Test
@@ -91,6 +115,7 @@ class TestProcessJob {
assertEquals(UnsuccessfulFinish, processJob.getStatus)
assertTrue(getMockJobModelManager(processJob).stopped)
+ assertTrue(getMockCoordinatorStreamStore(processJob).closed)
}
@Test
@@ -101,6 +126,7 @@ class TestProcessJob {
assertEquals(UnsuccessfulFinish, processJob.getStatus)
assertTrue(getMockJobModelManager(processJob).stopped)
+ assertTrue(getMockCoordinatorStreamStore(processJob).closed)
}
@Test
@@ -111,6 +137,7 @@ class TestProcessJob {
assertEquals(SuccessfulFinish, processJob.getStatus)
assertTrue(getMockJobModelManager(processJob).stopped)
+ assertTrue(getMockCoordinatorStreamStore(processJob).closed)
}
@Test
@@ -145,3 +172,18 @@ class MockJobModelManager extends JobModelManager(null, null) {
stopped = true
}
}
+
+class MockCoordinateStreamStore(
+ config: MapConfig,
+ systemProducer: SystemProducer,
+ systemConsumer: SystemConsumer,
+ systemAdmin: SystemAdmin)
+ extends CoordinatorStreamStore(config, systemProducer, systemConsumer, systemAdmin) {
+ var closed: Boolean = false
+
+ override def init: Unit = {}
+
+ override def close: Unit = {
+ closed = true
+ }
+}
\ No newline at end of file