You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/09/18 23:01:25 UTC

[28/47] samza git commit: added eventPrcoessed sync

added eventPrcoessed sync


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b5ce9b38
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b5ce9b38
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b5ce9b38

Branch: refs/heads/NewKafkaSystemConsumer
Commit: b5ce9b38da88318a625f1dd7a6d35b9ed14ca04b
Parents: 19ba300
Author: Boris S <bo...@apache.org>
Authored: Tue Sep 4 17:22:16 2018 -0700
Committer: Boris S <bo...@apache.org>
Committed: Tue Sep 4 17:22:16 2018 -0700

----------------------------------------------------------------------
 .../apache/samza/checkpoint/OffsetManager.scala |  4 ++--
 .../apache/samza/container/SamzaContainer.scala |  2 +-
 .../org/apache/samza/job/local/ThreadJob.scala  |  5 +----
 .../samza/job/local/ThreadJobFactory.scala      |  6 +++++-
 .../apache/samza/job/local/TestThreadJob.scala  |  9 --------
 .../samza/system/kafka/KafkaConsumerProxy.java  | 22 ++++++++++++++------
 .../system/kafka/NewKafkaSystemConsumer.java    | 18 +++++++++-------
 .../test/integration/StreamTaskTestUtil.scala   | 17 +++++++++++++--
 .../integration/TestShutdownStatefulTask.scala  |  6 +-----
 9 files changed, 52 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 53d5e98..d2b6667 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -304,7 +304,7 @@ class OffsetManager(
    */
   private def loadOffsetsFromCheckpointManager {
     if (checkpointManager != null) {
-      debug("Loading offsets from checkpoint manager.")
+      info("Loading offsets from checkpoint manager.")
 
       checkpointManager.start
       val result = systemStreamPartitions
@@ -332,7 +332,7 @@ class OffsetManager(
    * Loads last processed offsets for a single taskName.
    */
   private def restoreOffsetsFromCheckpoint(taskName: TaskName): Map[TaskName, Map[SystemStreamPartition, String]] = {
-    debug("Loading checkpoints for taskName: %s." format taskName)
+    info("Loading checkpoints for taskName: %s." format taskName)
 
     val checkpoint = checkpointManager.readLastCheckpoint(taskName)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 0c889d2..d02660b 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -825,7 +825,7 @@ class SamzaContainer(
     }
 
     try {
-      info("Shutting down.")
+      info("Shutting down Samza.")
       removeShutdownHook
 
       jmxServer.stop

http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
index 33dde52..a61a297 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
@@ -19,12 +19,11 @@
 
 package org.apache.samza.job.local
 
-import org.apache.samza.coordinator.JobModelManager
 import org.apache.samza.job.ApplicationStatus.{New, Running, SuccessfulFinish, UnsuccessfulFinish}
 import org.apache.samza.job.{ApplicationStatus, StreamJob}
 import org.apache.samza.util.Logging
 
-class ThreadJob(runnable: Runnable, val jobModelManager: JobModelManager) extends StreamJob with Logging {
+class ThreadJob(runnable: Runnable) extends StreamJob with Logging {
   @volatile var jobStatus: Option[ApplicationStatus] = None
   var thread: Thread = null
 
@@ -44,8 +43,6 @@ class ThreadJob(runnable: Runnable, val jobModelManager: JobModelManager) extend
             jobStatus = Some(UnsuccessfulFinish)
             throw e
           }
-        } finally {
-          jobModelManager.stop
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 4b08721..34cc2a0 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -19,6 +19,8 @@
 
 package org.apache.samza.job.local
 
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
 import org.apache.samza.config.{Config, TaskConfigJava}
 import org.apache.samza.config.JobConfig._
 import org.apache.samza.config.ShellCommandConfig._
@@ -65,6 +67,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
     val checkpointManager = new TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)
     if (checkpointManager != null) {
       checkpointManager.createResources()
+      checkpointManager.stop()
     }
     ChangelogStreamManager.createChangelogStreams(jobModel.getConfig, jobModel.maxChangeLogStreamPartitions)
 
@@ -110,10 +113,11 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
         taskFactory)
       container.setContainerListener(containerListener)
 
-      val threadJob = new ThreadJob(container, coordinator)
+      val threadJob = new ThreadJob(container)
       threadJob
     } finally {
       coordinator.stop
+      coordinatorStreamManager.stop
       jmxServer.stop
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
index b1de215..4f3f511 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
@@ -19,7 +19,6 @@
 
 package org.apache.samza.job.local
 
-import org.apache.samza.coordinator.JobModelManager
 import org.junit.Assert._
 import org.junit.Test
 import org.apache.samza.job.ApplicationStatus
@@ -30,10 +29,6 @@ class TestThreadJob {
     val job = new ThreadJob(new Runnable {
       override def run {
       }
-    }, new JobModelManager(null) {
-      override def stop: Unit = {
-
-      }
     })
     job.submit
     job.waitForFinish(999999)
@@ -45,10 +40,6 @@ class TestThreadJob {
       override def run {
         Thread.sleep(999999)
       }
-    }, new JobModelManager(null) {
-      override def stop: Unit = {
-
-      }
     })
     job.submit
     job.waitForFinish(500)

http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index e61e0ff..cddfdfd 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -72,7 +72,7 @@ public class KafkaConsumerProxy<K, V> {
 
   private volatile boolean isRunning = false;
   private volatile Throwable failureCause = null;
-  private CountDownLatch consumerPollThreadStartLatch = new CountDownLatch(1);
+  private final CountDownLatch consumerPollThreadStartLatch = new CountDownLatch(1);
 
   public KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, String clientId,
       NewKafkaSystemConsumer.KafkaConsumerMessageSink messageSink, KafkaSystemConsumerMetrics samzaConsumerMetrics,
@@ -93,19 +93,26 @@ public class KafkaConsumerProxy<K, V> {
 
   public void start() {
     if (!consumerPollThread.isAlive()) {
-      LOG.info("Starting LiKafkaConsumerProxy polling thread for system " + systemName + " " + this.toString());
+      LOG.info("Starting KafkaConsumerProxy polling thread for system " + systemName + " " + this.toString());
       consumerPollThread.setDaemon(true);
       consumerPollThread.setName(
-          "Samza LiKafkaConsumerProxy Poll " + consumerPollThread.getName() + " - " + systemName);
+          "Samza KafkaConsumerProxy Poll " + consumerPollThread.getName() + " - " + systemName);
       consumerPollThread.start();
 
+      System.out.println("THREAD: starting" + consumerPollThread.getName());
+
+
       // we need to wait until the thread starts
       while (!isRunning) {
         try {
           consumerPollThreadStartLatch.await(3000, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
+          LOG.info("WTH");
         }
       }
+      new Exception().printStackTrace(System.out);
+      System.out.println("THREAD: started" + consumerPollThread.getName());
+
     } else {
       LOG.debug("Tried to start an already started LiKafkaConsumerProxy (%s). Ignoring.", this.toString());
     }
@@ -135,12 +142,15 @@ public class KafkaConsumerProxy<K, V> {
     return () -> {
       isRunning = true;
 
+
       try {
         consumerPollThreadStartLatch.countDown();
+        System.out.println("THREAD: runing " + consumerPollThread.getName());
         initializeLags();
         while (isRunning) {
           fetchMessages();
         }
+        System.out.println("THREAD: finished " + consumerPollThread.getName());
       } catch (Throwable throwable) {
         LOG.error(String.format("Error in LiKafkaConsumerProxy poll thread for system: %s.", systemName), throwable);
         // SamzaLiKafkaSystemConsumer uses the failureCause to propagate the throwable to the container
@@ -164,7 +174,7 @@ public class KafkaConsumerProxy<K, V> {
       // If the message we are about to consume is < end offset, we are starting with a lag.
       long initialLag = endOffsets.get(tp) - startingOffset;
 
-      LOG.info("Initial lag is {} for SSP {}", initialLag, ssp);
+      LOG.info("Initial lag for SSP {} is {} (end={}, startOffset={})", ssp, initialLag, endOffsets.get(tp), startingOffset);
       latestLags.put(ssp, initialLag);
       sink.setIsAtHighWatermark(ssp, initialLag == 0);
     });
@@ -446,13 +456,13 @@ public class KafkaConsumerProxy<K, V> {
   }
 
   public void stop(long timeout) {
-    LOG.info("Shutting down LiKafkaConsumerProxy poll thread:" + toString());
+    System.out.println("THREAD: Shutting down LiKafkaConsumerProxy poll thread:" + consumerPollThread.getName());
 
     isRunning = false;
     try {
       consumerPollThread.join(timeout);
     } catch (InterruptedException e) {
-      LOG.warn("Join in LiKafkaConsumerProxy has failed", e);
+      LOG.warn("Join in KafkaConsumerProxy has failed", e);
       consumerPollThread.interrupt();
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
index aeeadce..b33db42 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
@@ -103,13 +103,16 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements
   public static <K, V> NewKafkaSystemConsumer getNewKafkaSystemConsumer(String systemName, Config config,
       String clientId, KafkaSystemConsumerMetrics metrics, Clock clock) {
 
-    System.out.println("GETTING FOR " + systemName);
 
-    System.out.printf("RETURNING NEW ONE");
+
     // extract consumer configs and create kafka consumer
     KafkaConsumer<K, V> kafkaConsumer = getKafkaConsumerImpl(systemName, clientId, config);
 
-    return new NewKafkaSystemConsumer(kafkaConsumer, systemName, config, clientId, metrics, clock);
+
+    NewKafkaSystemConsumer kc = new NewKafkaSystemConsumer(kafkaConsumer, systemName, config, clientId, metrics, clock);
+    System.out.println("kc=" + kc + "!!!!!!!!!!!!!!!!!GETTING FOR NKC for " + systemName);
+
+    return kc;
   }
 
   /**
@@ -254,7 +257,8 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements
 
   @Override
   public void stop() {
-    System.out.println("##################### stopping " + this + "; kc=" + kafkaConsumer);
+    System.out.println("kc=" + this + "!!!!!!!!!!!!!!!!!!!!!! stopping "+ "; kc=" + kafkaConsumer);
+    System.out.println("kc=" + this + "!!!!!!!!!!!!!!!!!!!!!!TPs = " + topicPartitions2Offset);
 
     if (!stopped.compareAndSet(false, true)) {
       LOG.warn("attempting to stop stopped consumer.");
@@ -300,7 +304,7 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements
 
     topicPartitions2SSP.put(tp, systemStreamPartition);
 
-    LOG.info("==============>registering ssp = " + systemStreamPartition + " with offset " + offset);
+    LOG.info("============>registering ssp = " + systemStreamPartition + " with offset " + offset + "; kc=" + this);
 
     String existingOffset = topicPartitions2Offset.get(tp);
     // register the older (of the two) offset in the consumer, to guarantee we do not miss any messages.
@@ -348,8 +352,8 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements
     }
 
     Map<SystemStreamPartition, List<IncomingMessageEnvelope>> res = super.poll(systemStreamPartitions, timeout);
-    LOG.info("=============================>. Res for " + systemStreamPartitions);
-    LOG.info("=============================>. Res:" + res.toString());
+    //LOG.info("=============================>. Res for " + systemStreamPartitions);
+    //LOG.info("=============================>. Res:" + res.toString());
     return res;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 864d2e5..2ea9a5f 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.samza.config._
 import org.apache.samza.container.TaskName
-import org.apache.samza.job.local.ThreadJobFactory
+import org.apache.samza.job.local.{ThreadJob, ThreadJobFactory}
 import org.apache.samza.job.model.{ContainerModel, JobModel}
 import org.apache.samza.job.{ApplicationStatus, JobRunner, StreamJob}
 import org.apache.samza.metrics.MetricsRegistryMap
@@ -223,9 +223,16 @@ class StreamTaskTestUtil {
    * interrupt, which is forwarded on to ThreadJob, and marked as a failure).
    */
   def stopJob(job: StreamJob) {
+    // make sure we don't kill the job before it was started
+    val tasks = TestTask.tasks
+    val task = tasks.values.toList.head
+    task.eventProcessed.await(60, TimeUnit.SECONDS)
+    System.out.println("THREAD: JOB KILL BEFORE")
     // Shutdown task.
     job.kill
+    System.out.println("THREAD: JOB KILL")
     val status = job.waitForFinish(60000)
+    System.out.println("THREAD: JOB KILL WAIT")
     assertEquals(ApplicationStatus.UnsuccessfulFinish, status)
   }
 
@@ -279,7 +286,10 @@ class StreamTaskTestUtil {
     val taskConfig = new TaskConfig(jobModel.getConfig)
     val checkpointManager = taskConfig.getCheckpointManager(new MetricsRegistryMap())
     checkpointManager match {
-      case Some(checkpointManager) => checkpointManager.createResources
+      case Some(checkpointManager) => {
+        checkpointManager.createResources
+        checkpointManager.stop
+      }
       case _ => assert(checkpointManager != null, "No checkpoint manager factory configured")
     }
 
@@ -323,6 +333,7 @@ object TestTask {
 abstract class TestTask extends StreamTask with InitableTask {
   var received = ArrayBuffer[String]()
   val initFinished = new CountDownLatch(1)
+  val eventProcessed = new CountDownLatch(1)
   @volatile var gotMessage = new CountDownLatch(1)
 
   def init(config: Config, context: TaskContext) {
@@ -334,6 +345,8 @@ abstract class TestTask extends StreamTask with InitableTask {
   def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
     val msg = envelope.getMessage.asInstanceOf[String]
 
+    eventProcessed.countDown()
+
     System.err.println("TestTask.process(): %s" format msg)
 
     received += msg

http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
index a42433c..ccb7cd4 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
@@ -77,20 +77,16 @@ class TestShutdownStatefulTask extends StreamTaskTestUtil {
     val (job, task) = startJob
 
     // Validate that restored is empty.
-    assertEquals(0, task.initFinished.getCount)
     assertEquals(0, task.asInstanceOf[ShutdownStateStoreTask].restored.size)
     assertEquals(0, task.received.size)
 
     // Send some messages to input stream.
-    System.out.println("************************BEFORE DONE sending")
     send(task, "1")
-    System.out.println("************************FIRST DONE sending")
     send(task, "2")
     send(task, "3")
     send(task, "2")
     send(task, "99")
     send(task, "99")
-    System.out.println("************************DONE sending")
     stopJob(job)
 
   }
@@ -122,7 +118,7 @@ class ShutdownStateStoreTask extends TestTask {
       .asInstanceOf[KeyValueStore[String, String]]
     val iter = store.all
     iter.asScala.foreach( p => restored += (p.getKey -> p.getValue))
-    System.err.println("ShutdownStateStoreTask.createStream(): %s" format restored)
+    System.out.println("ShutdownStateStoreTask.createStream(): %s" format restored)
     iter.close
   }