You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/09/18 23:01:25 UTC
[28/47] samza git commit: added eventPrcoessed sync
added eventPrcoessed sync
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b5ce9b38
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b5ce9b38
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b5ce9b38
Branch: refs/heads/NewKafkaSystemConsumer
Commit: b5ce9b38da88318a625f1dd7a6d35b9ed14ca04b
Parents: 19ba300
Author: Boris S <bo...@apache.org>
Authored: Tue Sep 4 17:22:16 2018 -0700
Committer: Boris S <bo...@apache.org>
Committed: Tue Sep 4 17:22:16 2018 -0700
----------------------------------------------------------------------
.../apache/samza/checkpoint/OffsetManager.scala | 4 ++--
.../apache/samza/container/SamzaContainer.scala | 2 +-
.../org/apache/samza/job/local/ThreadJob.scala | 5 +----
.../samza/job/local/ThreadJobFactory.scala | 6 +++++-
.../apache/samza/job/local/TestThreadJob.scala | 9 --------
.../samza/system/kafka/KafkaConsumerProxy.java | 22 ++++++++++++++------
.../system/kafka/NewKafkaSystemConsumer.java | 18 +++++++++-------
.../test/integration/StreamTaskTestUtil.scala | 17 +++++++++++++--
.../integration/TestShutdownStatefulTask.scala | 6 +-----
9 files changed, 52 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 53d5e98..d2b6667 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -304,7 +304,7 @@ class OffsetManager(
*/
private def loadOffsetsFromCheckpointManager {
if (checkpointManager != null) {
- debug("Loading offsets from checkpoint manager.")
+ info("Loading offsets from checkpoint manager.")
checkpointManager.start
val result = systemStreamPartitions
@@ -332,7 +332,7 @@ class OffsetManager(
* Loads last processed offsets for a single taskName.
*/
private def restoreOffsetsFromCheckpoint(taskName: TaskName): Map[TaskName, Map[SystemStreamPartition, String]] = {
- debug("Loading checkpoints for taskName: %s." format taskName)
+ info("Loading checkpoints for taskName: %s." format taskName)
val checkpoint = checkpointManager.readLastCheckpoint(taskName)
http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 0c889d2..d02660b 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -825,7 +825,7 @@ class SamzaContainer(
}
try {
- info("Shutting down.")
+ info("Shutting down Samza.")
removeShutdownHook
jmxServer.stop
http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
index 33dde52..a61a297 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
@@ -19,12 +19,11 @@
package org.apache.samza.job.local
-import org.apache.samza.coordinator.JobModelManager
import org.apache.samza.job.ApplicationStatus.{New, Running, SuccessfulFinish, UnsuccessfulFinish}
import org.apache.samza.job.{ApplicationStatus, StreamJob}
import org.apache.samza.util.Logging
-class ThreadJob(runnable: Runnable, val jobModelManager: JobModelManager) extends StreamJob with Logging {
+class ThreadJob(runnable: Runnable) extends StreamJob with Logging {
@volatile var jobStatus: Option[ApplicationStatus] = None
var thread: Thread = null
@@ -44,8 +43,6 @@ class ThreadJob(runnable: Runnable, val jobModelManager: JobModelManager) extend
jobStatus = Some(UnsuccessfulFinish)
throw e
}
- } finally {
- jobModelManager.stop
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 4b08721..34cc2a0 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -19,6 +19,8 @@
package org.apache.samza.job.local
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
import org.apache.samza.config.{Config, TaskConfigJava}
import org.apache.samza.config.JobConfig._
import org.apache.samza.config.ShellCommandConfig._
@@ -65,6 +67,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
val checkpointManager = new TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)
if (checkpointManager != null) {
checkpointManager.createResources()
+ checkpointManager.stop()
}
ChangelogStreamManager.createChangelogStreams(jobModel.getConfig, jobModel.maxChangeLogStreamPartitions)
@@ -110,10 +113,11 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
taskFactory)
container.setContainerListener(containerListener)
- val threadJob = new ThreadJob(container, coordinator)
+ val threadJob = new ThreadJob(container)
threadJob
} finally {
coordinator.stop
+ coordinatorStreamManager.stop
jmxServer.stop
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
index b1de215..4f3f511 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
@@ -19,7 +19,6 @@
package org.apache.samza.job.local
-import org.apache.samza.coordinator.JobModelManager
import org.junit.Assert._
import org.junit.Test
import org.apache.samza.job.ApplicationStatus
@@ -30,10 +29,6 @@ class TestThreadJob {
val job = new ThreadJob(new Runnable {
override def run {
}
- }, new JobModelManager(null) {
- override def stop: Unit = {
-
- }
})
job.submit
job.waitForFinish(999999)
@@ -45,10 +40,6 @@ class TestThreadJob {
override def run {
Thread.sleep(999999)
}
- }, new JobModelManager(null) {
- override def stop: Unit = {
-
- }
})
job.submit
job.waitForFinish(500)
http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index e61e0ff..cddfdfd 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -72,7 +72,7 @@ public class KafkaConsumerProxy<K, V> {
private volatile boolean isRunning = false;
private volatile Throwable failureCause = null;
- private CountDownLatch consumerPollThreadStartLatch = new CountDownLatch(1);
+ private final CountDownLatch consumerPollThreadStartLatch = new CountDownLatch(1);
public KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, String clientId,
NewKafkaSystemConsumer.KafkaConsumerMessageSink messageSink, KafkaSystemConsumerMetrics samzaConsumerMetrics,
@@ -93,19 +93,26 @@ public class KafkaConsumerProxy<K, V> {
public void start() {
if (!consumerPollThread.isAlive()) {
- LOG.info("Starting LiKafkaConsumerProxy polling thread for system " + systemName + " " + this.toString());
+ LOG.info("Starting KafkaConsumerProxy polling thread for system " + systemName + " " + this.toString());
consumerPollThread.setDaemon(true);
consumerPollThread.setName(
- "Samza LiKafkaConsumerProxy Poll " + consumerPollThread.getName() + " - " + systemName);
+ "Samza KafkaConsumerProxy Poll " + consumerPollThread.getName() + " - " + systemName);
consumerPollThread.start();
+ System.out.println("THREAD: starting" + consumerPollThread.getName());
+
+
// we need to wait until the thread starts
while (!isRunning) {
try {
consumerPollThreadStartLatch.await(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
+ LOG.info("WTH");
}
}
+ new Exception().printStackTrace(System.out);
+ System.out.println("THREAD: started" + consumerPollThread.getName());
+
} else {
LOG.debug("Tried to start an already started LiKafkaConsumerProxy (%s). Ignoring.", this.toString());
}
@@ -135,12 +142,15 @@ public class KafkaConsumerProxy<K, V> {
return () -> {
isRunning = true;
+
try {
consumerPollThreadStartLatch.countDown();
+ System.out.println("THREAD: runing " + consumerPollThread.getName());
initializeLags();
while (isRunning) {
fetchMessages();
}
+ System.out.println("THREAD: finished " + consumerPollThread.getName());
} catch (Throwable throwable) {
LOG.error(String.format("Error in LiKafkaConsumerProxy poll thread for system: %s.", systemName), throwable);
// SamzaLiKafkaSystemConsumer uses the failureCause to propagate the throwable to the container
@@ -164,7 +174,7 @@ public class KafkaConsumerProxy<K, V> {
// If the message we are about to consume is < end offset, we are starting with a lag.
long initialLag = endOffsets.get(tp) - startingOffset;
- LOG.info("Initial lag is {} for SSP {}", initialLag, ssp);
+ LOG.info("Initial lag for SSP {} is {} (end={}, startOffset={})", ssp, initialLag, endOffsets.get(tp), startingOffset);
latestLags.put(ssp, initialLag);
sink.setIsAtHighWatermark(ssp, initialLag == 0);
});
@@ -446,13 +456,13 @@ public class KafkaConsumerProxy<K, V> {
}
public void stop(long timeout) {
- LOG.info("Shutting down LiKafkaConsumerProxy poll thread:" + toString());
+ System.out.println("THREAD: Shutting down LiKafkaConsumerProxy poll thread:" + consumerPollThread.getName());
isRunning = false;
try {
consumerPollThread.join(timeout);
} catch (InterruptedException e) {
- LOG.warn("Join in LiKafkaConsumerProxy has failed", e);
+ LOG.warn("Join in KafkaConsumerProxy has failed", e);
consumerPollThread.interrupt();
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
index aeeadce..b33db42 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
@@ -103,13 +103,16 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements
public static <K, V> NewKafkaSystemConsumer getNewKafkaSystemConsumer(String systemName, Config config,
String clientId, KafkaSystemConsumerMetrics metrics, Clock clock) {
- System.out.println("GETTING FOR " + systemName);
- System.out.printf("RETURNING NEW ONE");
+
// extract consumer configs and create kafka consumer
KafkaConsumer<K, V> kafkaConsumer = getKafkaConsumerImpl(systemName, clientId, config);
- return new NewKafkaSystemConsumer(kafkaConsumer, systemName, config, clientId, metrics, clock);
+
+ NewKafkaSystemConsumer kc = new NewKafkaSystemConsumer(kafkaConsumer, systemName, config, clientId, metrics, clock);
+ System.out.println("kc=" + kc + "!!!!!!!!!!!!!!!!!GETTING FOR NKC for " + systemName);
+
+ return kc;
}
/**
@@ -254,7 +257,8 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements
@Override
public void stop() {
- System.out.println("##################### stopping " + this + "; kc=" + kafkaConsumer);
+ System.out.println("kc=" + this + "!!!!!!!!!!!!!!!!!!!!!! stopping "+ "; kc=" + kafkaConsumer);
+ System.out.println("kc=" + this + "!!!!!!!!!!!!!!!!!!!!!!TPs = " + topicPartitions2Offset);
if (!stopped.compareAndSet(false, true)) {
LOG.warn("attempting to stop stopped consumer.");
@@ -300,7 +304,7 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements
topicPartitions2SSP.put(tp, systemStreamPartition);
- LOG.info("==============>registering ssp = " + systemStreamPartition + " with offset " + offset);
+ LOG.info("============>registering ssp = " + systemStreamPartition + " with offset " + offset + "; kc=" + this);
String existingOffset = topicPartitions2Offset.get(tp);
// register the older (of the two) offset in the consumer, to guarantee we do not miss any messages.
@@ -348,8 +352,8 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements
}
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> res = super.poll(systemStreamPartitions, timeout);
- LOG.info("=============================>. Res for " + systemStreamPartitions);
- LOG.info("=============================>. Res:" + res.toString());
+ //LOG.info("=============================>. Res for " + systemStreamPartitions);
+ //LOG.info("=============================>. Res:" + res.toString());
return res;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 864d2e5..2ea9a5f 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.security.JaasUtils
import org.apache.samza.config._
import org.apache.samza.container.TaskName
-import org.apache.samza.job.local.ThreadJobFactory
+import org.apache.samza.job.local.{ThreadJob, ThreadJobFactory}
import org.apache.samza.job.model.{ContainerModel, JobModel}
import org.apache.samza.job.{ApplicationStatus, JobRunner, StreamJob}
import org.apache.samza.metrics.MetricsRegistryMap
@@ -223,9 +223,16 @@ class StreamTaskTestUtil {
* interrupt, which is forwarded on to ThreadJob, and marked as a failure).
*/
def stopJob(job: StreamJob) {
+ // make sure we don't kill the job before it was started
+ val tasks = TestTask.tasks
+ val task = tasks.values.toList.head
+ task.eventProcessed.await(60, TimeUnit.SECONDS)
+ System.out.println("THREAD: JOB KILL BEFORE")
// Shutdown task.
job.kill
+ System.out.println("THREAD: JOB KILL")
val status = job.waitForFinish(60000)
+ System.out.println("THREAD: JOB KILL WAIT")
assertEquals(ApplicationStatus.UnsuccessfulFinish, status)
}
@@ -279,7 +286,10 @@ class StreamTaskTestUtil {
val taskConfig = new TaskConfig(jobModel.getConfig)
val checkpointManager = taskConfig.getCheckpointManager(new MetricsRegistryMap())
checkpointManager match {
- case Some(checkpointManager) => checkpointManager.createResources
+ case Some(checkpointManager) => {
+ checkpointManager.createResources
+ checkpointManager.stop
+ }
case _ => assert(checkpointManager != null, "No checkpoint manager factory configured")
}
@@ -323,6 +333,7 @@ object TestTask {
abstract class TestTask extends StreamTask with InitableTask {
var received = ArrayBuffer[String]()
val initFinished = new CountDownLatch(1)
+ val eventProcessed = new CountDownLatch(1)
@volatile var gotMessage = new CountDownLatch(1)
def init(config: Config, context: TaskContext) {
@@ -334,6 +345,8 @@ abstract class TestTask extends StreamTask with InitableTask {
def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
val msg = envelope.getMessage.asInstanceOf[String]
+ eventProcessed.countDown()
+
System.err.println("TestTask.process(): %s" format msg)
received += msg
http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
index a42433c..ccb7cd4 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
@@ -77,20 +77,16 @@ class TestShutdownStatefulTask extends StreamTaskTestUtil {
val (job, task) = startJob
// Validate that restored is empty.
- assertEquals(0, task.initFinished.getCount)
assertEquals(0, task.asInstanceOf[ShutdownStateStoreTask].restored.size)
assertEquals(0, task.received.size)
// Send some messages to input stream.
- System.out.println("************************BEFORE DONE sending")
send(task, "1")
- System.out.println("************************FIRST DONE sending")
send(task, "2")
send(task, "3")
send(task, "2")
send(task, "99")
send(task, "99")
- System.out.println("************************DONE sending")
stopJob(job)
}
@@ -122,7 +118,7 @@ class ShutdownStateStoreTask extends TestTask {
.asInstanceOf[KeyValueStore[String, String]]
val iter = store.all
iter.asScala.foreach( p => restored += (p.getKey -> p.getValue))
- System.err.println("ShutdownStateStoreTask.createStream(): %s" format restored)
+ System.out.println("ShutdownStateStoreTask.createStream(): %s" format restored)
iter.close
}