You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/09/18 23:01:22 UTC

[25/47] samza git commit: added JobModelManager to ThreadJob

added JobModelManager to ThreadJob


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/22034947
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/22034947
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/22034947

Branch: refs/heads/NewKafkaSystemConsumer
Commit: 22034947b998d3604bc3911a417b9c1e761bb90f
Parents: c14557f
Author: Boris S <bo...@apache.org>
Authored: Fri Aug 31 14:36:51 2018 -0700
Committer: Boris S <bo...@apache.org>
Committed: Fri Aug 31 14:36:51 2018 -0700

----------------------------------------------------------------------
 .../stream/CoordinatorStreamSystemConsumer.java |   4 +-
 .../org/apache/samza/job/local/ThreadJob.scala  |   5 +-
 .../samza/job/local/ThreadJobFactory.scala      |   2 +-
 .../apache/samza/job/local/TestThreadJob.scala  |   9 ++
 .../system/kafka/NewKafkaSystemConsumer.java    | 121 +++++++++----------
 .../integration/TestShutdownStatefulTask.scala  |   4 +-
 6 files changed, 75 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/22034947/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
index 38255a2..0bdb874 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -176,7 +176,7 @@ public class CoordinatorStreamSystemConsumer {
             valueMap = messageSerde.fromBytes((byte[]) envelope.getMessage());
           }
           CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, valueMap);
-          log.debug("Received coordinator stream message: {}", coordinatorStreamMessage);
+          log.info("Received coordinator stream message: {}", coordinatorStreamMessage);
           // Remove any existing entry. Set.add() does not add if the element already exists.
           if (bootstrappedMessages.remove(coordinatorStreamMessage)) {
             log.debug("Removed duplicate message: {}", coordinatorStreamMessage);
@@ -194,7 +194,7 @@ public class CoordinatorStreamSystemConsumer {
         }
 
         bootstrappedStreamSet = Collections.unmodifiableSet(bootstrappedMessages);
-        log.debug("Bootstrapped configuration: {}", configMap);
+        log.info("Bootstrapped configuration: {}", configMap);
         isBootstrapped = true;
       } catch (Exception e) {
         throw new SamzaException(e);

http://git-wip-us.apache.org/repos/asf/samza/blob/22034947/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
index a61a297..33dde52 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala
@@ -19,11 +19,12 @@
 
 package org.apache.samza.job.local
 
+import org.apache.samza.coordinator.JobModelManager
 import org.apache.samza.job.ApplicationStatus.{New, Running, SuccessfulFinish, UnsuccessfulFinish}
 import org.apache.samza.job.{ApplicationStatus, StreamJob}
 import org.apache.samza.util.Logging
 
-class ThreadJob(runnable: Runnable) extends StreamJob with Logging {
+class ThreadJob(runnable: Runnable, val jobModelManager: JobModelManager) extends StreamJob with Logging {
   @volatile var jobStatus: Option[ApplicationStatus] = None
   var thread: Thread = null
 
@@ -43,6 +44,8 @@ class ThreadJob(runnable: Runnable) extends StreamJob with Logging {
             jobStatus = Some(UnsuccessfulFinish)
             throw e
           }
+        } finally {
+          jobModelManager.stop
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/22034947/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 0b472aa..4b08721 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -110,7 +110,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
         taskFactory)
       container.setContainerListener(containerListener)
 
-      val threadJob = new ThreadJob(container)
+      val threadJob = new ThreadJob(container, coordinator)
       threadJob
     } finally {
       coordinator.stop

http://git-wip-us.apache.org/repos/asf/samza/blob/22034947/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
index 4f3f511..b1de215 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
@@ -19,6 +19,7 @@
 
 package org.apache.samza.job.local
 
+import org.apache.samza.coordinator.JobModelManager
 import org.junit.Assert._
 import org.junit.Test
 import org.apache.samza.job.ApplicationStatus
@@ -29,6 +30,10 @@ class TestThreadJob {
     val job = new ThreadJob(new Runnable {
       override def run {
       }
+    }, new JobModelManager(null) {
+      override def stop: Unit = {
+
+      }
     })
     job.submit
     job.waitForFinish(999999)
@@ -40,6 +45,10 @@ class TestThreadJob {
       override def run {
         Thread.sleep(999999)
       }
+    }, new JobModelManager(null) {
+      override def stop: Unit = {
+
+      }
     })
     job.submit
     job.waitForFinish(500)

http://git-wip-us.apache.org/repos/asf/samza/blob/22034947/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
index b745628..e34812f 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java
@@ -1,3 +1,4 @@
+
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -21,47 +22,38 @@
 
 package org.apache.samza.system.kafka;
 
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
 import kafka.common.TopicAndPartition;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.KafkaConsumerConfig;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.KafkaConfig;
-import org.apache.samza.config.StreamConfig;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemConsumer;
-import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.BlockingEnvelopeMap;
 import org.apache.samza.util.Clock;
-import org.apache.samza.util.KafkaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
-import scala.collection.JavaConversions;
 
 
-public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements SystemConsumer{
+public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements SystemConsumer {
 
   private static final Logger LOG = LoggerFactory.getLogger(NewKafkaSystemConsumer.class);
 
   private static final long FETCH_THRESHOLD = 50000;
   private static final long FETCH_THRESHOLD_BYTES = -1L;
-  private final Consumer<K,V> kafkaConsumer;
+  private final Consumer<K, V> kafkaConsumer;
   private final String systemName;
   private final KafkaSystemConsumerMetrics samzaConsumerMetrics;
   private final String clientId;
@@ -78,8 +70,8 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
   private KafkaConsumerProxy proxy;
 
   /* package private */final Map<TopicPartition, String> topicPartitions2Offset = new HashMap<>();
-  /* package private */long perPartitionFetchThreshold;
-  /* package private */long perPartitionFetchThresholdBytes;
+  /* package private */ long perPartitionFetchThreshold;
+  /* package private */ long perPartitionFetchThresholdBytes;
 
   // TODO - consider new class for KafkaSystemConsumerMetrics
 
@@ -88,15 +80,10 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
    * @param config
    * @param metrics
    */
-  public NewKafkaSystemConsumer(
-      Consumer<K,V> kafkaConsumer,
-      String systemName,
-      Config config,
-      String clientId,
-      KafkaSystemConsumerMetrics metrics,
-      Clock clock) {
+  protected NewKafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, Config config, String clientId,
+      KafkaSystemConsumerMetrics metrics, Clock clock) {
 
-    super(metrics.registry(),clock, metrics.getClass().getName());
+    super(metrics.registry(), clock, metrics.getClass().getName());
 
     this.samzaConsumerMetrics = metrics;
     this.clientId = clientId;
@@ -109,26 +96,20 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
     this.fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName);
 
     LOG.info(String.format(
-        "Created SamzaLiKafkaSystemConsumer for system=%s, clientId=%s, metricName=%s with liKafkaConsumer=%s",
-        systemName, clientId, metricName, this.kafkaConsumer.toString()));
+        "Created SamzaKafkaSystemConsumer for system=%s, clientId=%s, metricName=%s with KafkaConsumer=%s", systemName,
+        clientId, metricName, this.kafkaConsumer.toString()));
   }
 
-  public static <K, V> NewKafkaSystemConsumer getNewKafkaSystemConsumer(
-      String systemName,
-      Config config,
-      String clientId,
-      KafkaSystemConsumerMetrics metrics,
-      Clock clock) {
+  public static <K, V> NewKafkaSystemConsumer getNewKafkaSystemConsumer(String systemName, Config config,
+      String clientId, KafkaSystemConsumerMetrics metrics, Clock clock) {
+
+    System.out.println("GETTING FOR " + systemName);
 
+    System.out.printf("RETURNING NEW ONE");
     // extract consumer configs and create kafka consumer
     KafkaConsumer<K, V> kafkaConsumer = getKafkaConsumerImpl(systemName, clientId, config);
 
-    return new NewKafkaSystemConsumer(kafkaConsumer,
-        systemName,
-        config,
-        clientId,
-        metrics,
-        clock);
+    return new NewKafkaSystemConsumer(kafkaConsumer, systemName, config, clientId, metrics, clock);
   }
 
   /**
@@ -146,7 +127,8 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
     KafkaConsumerConfig consumerConfig =
         KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId, injectProps);
 
-    LOG.info("==============>Consumer properties in getKafkaConsumerImpl: systemName: {}, consumerProperties: {}", systemName, consumerConfig.originals());
+    LOG.info("==============>Consumer properties in getKafkaConsumerImpl: systemName: {}, consumerProperties: {}",
+        systemName, consumerConfig.originals());
 
     return new KafkaConsumer<>(consumerConfig.originals());
   }
@@ -157,7 +139,7 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
       LOG.warn("attempting to start the consumer for the second (or more) time.");
       return;
     }
-    if(stopped.get()) {
+    if (stopped.get()) {
       LOG.warn("attempting to start a stopped consumer");
       return;
     }
@@ -197,8 +179,7 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
     messageSink = new KafkaConsumerMessageSink();
 
     // create the thread with the consumer
-    proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink,
-        samzaConsumerMetrics, metricName);
+    proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, samzaConsumerMetrics, metricName);
 
     LOG.info("==============>Created consumer proxy: " + proxy);
   }
@@ -231,8 +212,10 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
       proxy.addTopicPartition(topicPartitions2SSP.get(tp), startingOffset);
     });
 
+    System.out.println("#####################started " + this + "; kc=" + kafkaConsumer);
     // start the proxy thread
     if (proxy != null && !proxy.isRunning()) {
+      System.out.println("#####################starting proxy " + proxy);
       proxy.start();
     }
   }
@@ -242,33 +225,37 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
     KafkaConfig kafkaConfig = new KafkaConfig(config);
     Option<String> fetchThresholdOption = kafkaConfig.getConsumerFetchThreshold(systemName);
     long fetchThreshold = FETCH_THRESHOLD;
-    if(fetchThresholdOption.isDefined()) {
+    if (fetchThresholdOption.isDefined()) {
       fetchThreshold = Long.valueOf(fetchThresholdOption.get());
       LOG.info("fetchThresholdOption is defined. fetchThreshold=" + fetchThreshold);
     }
     Option<String> fetchThresholdBytesOption = kafkaConfig.getConsumerFetchThresholdBytes(systemName);
     long fetchThresholdBytes = FETCH_THRESHOLD_BYTES;
-    if(fetchThresholdBytesOption.isDefined()) {
+    if (fetchThresholdBytesOption.isDefined()) {
       fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get());
       LOG.info("fetchThresholdBytesOption is defined. fetchThresholdBytes=" + fetchThresholdBytes);
     }
     LOG.info("fetchThresholdBytes = " + fetchThresholdBytes + "; fetchThreshold=" + fetchThreshold);
-    LOG.info("topicPartitions2Offset #=" + topicPartitions2Offset.size() + "; topicPartition2SSP #=" + topicPartitions2SSP.size());
+    LOG.info("topicPartitions2Offset #=" + topicPartitions2Offset.size() + "; topicPartition2SSP #="
+        + topicPartitions2SSP.size());
 
     if (topicPartitions2SSP.size() > 0) {
       perPartitionFetchThreshold = fetchThreshold / topicPartitions2SSP.size();
       LOG.info("perPartitionFetchThreshold=" + perPartitionFetchThreshold);
-      if(fetchThresholdBytesEnabled) {
+      if (fetchThresholdBytesEnabled) {
         // currently this feature cannot be enabled, because we do not have the size of the messages available.
         // messages get double buffered, hence divide by 2
         perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / topicPartitions2SSP.size();
-        LOG.info("perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes=" + perPartitionFetchThresholdBytes);
+        LOG.info("perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes="
+            + perPartitionFetchThresholdBytes);
       }
     }
   }
 
   @Override
   public void stop() {
+    System.out.println("##################### stopping " + this + "; kc=" + kafkaConsumer);
+
     if (!stopped.compareAndSet(false, true)) {
       LOG.warn("attempting to stop stopped consumer.");
       return;
@@ -276,8 +263,10 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
 
     LOG.warn("Stopping SamzaRawLiKafkaConsumer + " + this);
     // stop the proxy (with 5 minutes timeout)
-    if(proxy != null)
+    if (proxy != null) {
+      System.out.println("##################### stopping proxy " + proxy);
       proxy.stop(TimeUnit.MINUTES.toMillis(5));
+    }
 
     try {
       synchronized (kafkaConsumer) {
@@ -293,6 +282,14 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
    */
   @Override
   public void register(SystemStreamPartition systemStreamPartition, String offset) {
+    if (started.get()) {
+      String msg =
+          String.format("Trying to register partition after consumer has been started. sn=%s, ssp=%s", systemName,
+              systemStreamPartition);
+      LOG.error(msg);
+      throw new SamzaException(msg);
+    }
+
     if (!systemStreamPartition.getSystem().equals(systemName)) {
       LOG.warn("ignoring SSP " + systemStreamPartition + ", because this consumer's system is " + systemName);
       return;
@@ -332,16 +329,17 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
 
   @Override
   public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
-      Set<SystemStreamPartition> systemStreamPartitions, long timeout)
-      throws InterruptedException {
+      Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
 
     // check if the proxy is running
-    if(!proxy.isRunning()) {
+    if (!proxy.isRunning()) {
       stop();
       if (proxy.getFailureCause() != null) {
         String message = "LiKafkaConsumerProxy has stopped";
-        if(proxy.getFailureCause() instanceof org.apache.kafka.common.errors.TopicAuthorizationException)
-          message += " due to TopicAuthorizationException Please refer to go/samzaacluserguide to correctly set up acls for your topic";
+        if (proxy.getFailureCause() instanceof org.apache.kafka.common.errors.TopicAuthorizationException) {
+          message +=
+              " due to TopicAuthorizationException Please refer to go/samzaacluserguide to correctly set up acls for your topic";
+        }
         throw new SamzaException(message, proxy.getFailureCause());
       } else {
         LOG.warn("Failure cause not populated for LiKafkaConsumerProxy");
@@ -349,7 +347,9 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
       }
     }
 
-    return super.poll(systemStreamPartitions, timeout);
+    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> res = super.poll(systemStreamPartitions, timeout);
+    LOG.info("=============================>. Res in POLL:" + res.toString());
+    return res;
   }
 
   public static TopicAndPartition toTopicAndPartition(TopicPartition tp) {
@@ -376,15 +376,6 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
     return systemName;
   }
 
-  private static Set<SystemStream> getIntermediateStreams(Config config) {
-    StreamConfig streamConfig = new StreamConfig(config);
-    Collection<String> streamIds = JavaConversions.asJavaCollection(streamConfig.getStreamIds());
-    return streamIds.stream()
-        .filter(streamConfig::getIsIntermediateStream)
-        .map(id -> streamConfig.streamIdToSystemStream(id))
-        .collect(Collectors.toSet());
-  }
-
   ////////////////////////////////////
   // inner class for the message sink
   ////////////////////////////////////
@@ -395,10 +386,11 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
     }
 
     boolean needsMoreMessages(SystemStreamPartition ssp) {
-      if(LOG.isDebugEnabled()) {
+      if (LOG.isDebugEnabled()) {
         LOG.debug("needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};"
-                + "(limit={}); messagesNumInQueue={}(limit={};", ssp, fetchThresholdBytesEnabled, getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes,
-            getNumMessagesInQueue(ssp), perPartitionFetchThreshold);
+                + "(limit={}); messagesNumInQueue={}(limit={};", ssp, fetchThresholdBytesEnabled,
+            getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, getNumMessagesInQueue(ssp),
+            perPartitionFetchThreshold);
       }
 
       if (fetchThresholdBytesEnabled) {
@@ -415,8 +407,7 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements
         put(ssp, envelope);
       } catch (InterruptedException e) {
         throw new SamzaException(
-            String.format("Interrupted while trying to add message with offset %s for ssp %s",
-                envelope.getOffset(),
+            String.format("Interrupted while trying to add message with offset %s for ssp %s", envelope.getOffset(),
                 ssp));
       }
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/22034947/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
index e4d47d1..a42433c 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
@@ -82,13 +82,15 @@ class TestShutdownStatefulTask extends StreamTaskTestUtil {
     assertEquals(0, task.received.size)
 
     // Send some messages to input stream.
+    System.out.println("************************BEFORE DONE sending")
     send(task, "1")
+    System.out.println("************************FIRST DONE sending")
     send(task, "2")
     send(task, "3")
     send(task, "2")
     send(task, "99")
     send(task, "99")
-
+    System.out.println("************************DONE sending")
     stopJob(job)
 
   }