You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/03/09 01:07:08 UTC

[samza] branch 1.1.0 updated (ca95f27 -> a9aa1c7)

This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a change to branch 1.1.0
in repository https://gitbox.apache.org/repos/asf/samza.git.


    from ca95f27  Update Samza version for 1.1.0 release branch (#943)
     new 655d60f  Bugfix: Recent CSM refactor was causing some metrics to not be emitted. Fixed <task>-restore-time metric.
     new 68a6cf4  SEP-19: Removing standbytasks.enabled config
     new a9aa1c7  [javadoc] Clarify semantics of the offset in IncomingMessageEnvelope as the offset provided by the system consumer for the message (#945)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/samza/system/IncomingMessageEnvelope.java    |  9 ++++++---
 .../main/scala/org/apache/samza/config/JobConfig.scala  |  5 ++---
 .../org/apache/samza/container/SamzaContainer.scala     |  9 +++++----
 .../apache/samza/container/SamzaContainerMetrics.scala  |  4 ++--
 .../scala/org/apache/samza/container/TaskInstance.scala |  7 -------
 .../apache/samza/storage/ContainerStorageManager.java   | 17 +++++++++--------
 .../apache/samza/system/SystemConsumersMetrics.scala    |  7 +++++--
 .../java/org/apache/samza/task/TestAsyncRunLoop.java    |  1 -
 .../org/apache/samza/container/TestSamzaContainer.scala |  1 +
 .../samza/processor/StreamProcessorTestUtils.scala      |  1 +
 10 files changed, 31 insertions(+), 30 deletions(-)


[samza] 02/03: SEP-19: Removing standbytasks.enabled config

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch 1.1.0
in repository https://gitbox.apache.org/repos/asf/samza.git

commit 68a6cf41817af7dd22aee5f2bcdce0f62b044901
Author: Ray Matharu <rm...@linkedin.com>
AuthorDate: Fri Mar 8 15:09:14 2019 -0800

    SEP-19: Removing standbytasks.enabled config
    
    Author: Ray Matharu <rm...@linkedin.com>
    
    Reviewers: Jagadish<ja...@apache.org>
    
    Closes #946 from rmatharu/removing-standby-config
---
 samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index a84c5b8..64235cf 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -111,9 +111,8 @@ object JobConfig {
   val JOB_DIAGNOSTICS_ENABLED = "job.diagnostics.enabled"
 
   // Enables standby tasks
-  val STANDBY_TASKS_ENABLED = "job.standbytasks.enabled"
   val STANDBY_TASKS_REPLICATION_FACTOR = "job.standbytasks.replication.factor"
-  val DEFAULT_STANDBY_TASKS_REPLICATION_FACTOR = 2
+  val DEFAULT_STANDBY_TASKS_REPLICATION_FACTOR = 1
 
   // Specify DiagnosticAppender class
   val DIAGNOSTICS_APPENDER_CLASS = "job.diagnostics.appender.class"
@@ -274,7 +273,7 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
     get(JobConfig.SYSTEM_STREAM_PARTITION_MAPPER_FACTORY, classOf[HashSystemStreamPartitionMapperFactory].getName)
   }
 
-  def getStandbyTasksEnabled = getBoolean(JobConfig.STANDBY_TASKS_ENABLED, false)
+  def getStandbyTasksEnabled = getStandbyTaskReplicationFactor > 1
 
   def getStandbyTaskReplicationFactor = getInt(JobConfig.STANDBY_TASKS_REPLICATION_FACTOR, JobConfig.DEFAULT_STANDBY_TASKS_REPLICATION_FACTOR)
 }


[samza] 01/03: Bugfix: Recent CSM refactor was causing some metrics to not be emitted. Fixed -restore-time metric.

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch 1.1.0
in repository https://gitbox.apache.org/repos/asf/samza.git

commit 655d60f5ed6a8b3cc6bace6b535744373736945a
Author: Ray Matharu <rm...@linkedin.com>
AuthorDate: Fri Mar 8 12:21:20 2019 -0800

    Bugfix: Recent CSM refactor was causing some metrics to not be emitted. Fixed <task>-restore-time metric.
    
    Author: Ray Matharu <rm...@linkedin.com>
    
    Reviewers: prateekm
    
    Closes #942 from rmatharu/test-metricsfix and squashes the following commits:
    
    c5a072f4 [Ray Matharu] minor fix
    3c1e25ad [Ray Matharu] minor
    b13b485a [Ray Matharu] minor
    7e60ad8b [Ray Matharu] minor
    7634a470 [Ray Matharu] removing CSM's registerMetrics
    78ee37e1 [Ray Matharu] removing unused imports
    f207f03d [Ray Matharu] minor
    8aec4fa7 [Ray Matharu] minor
    0c121262 [Ray Matharu] Fixing metrics after moving sideInputs to CSM
    8bd3b19f [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    6b58c862 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    89be3652 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    6fe29268 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    96e3d8f3 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    40f68a61 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    497602ab [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    1a72dc48 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    36c0b339 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    12ca96bb [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    ee7daac8 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    08006871 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    916f66ae [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    2c09b081 [Ray Matharu] Rocksdb bug fix
---
 .../org/apache/samza/container/SamzaContainer.scala     |  9 +++++----
 .../apache/samza/container/SamzaContainerMetrics.scala  |  4 ++--
 .../scala/org/apache/samza/container/TaskInstance.scala |  7 -------
 .../apache/samza/storage/ContainerStorageManager.java   | 17 +++++++++--------
 .../apache/samza/system/SystemConsumersMetrics.scala    |  7 +++++--
 .../java/org/apache/samza/task/TestAsyncRunLoop.java    |  1 -
 .../org/apache/samza/container/TestSamzaContainer.scala |  1 +
 .../samza/processor/StreamProcessorTestUtils.scala      |  1 +
 8 files changed, 23 insertions(+), 24 deletions(-)

diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 5df4678..ab89396 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -594,7 +594,6 @@ object SamzaContainer extends Logging {
           offsetManager = offsetManager,
           storageManager = storageManager,
           tableManager = tableManager,
-          reporters = reporters,
           systemStreamPartitions = taskSSPs -- taskSideInputSSPs,
           exceptionHandler = TaskInstanceExceptionHandler(taskInstanceMetrics.get(taskName).get, config),
           jobModel = jobModel,
@@ -663,6 +662,7 @@ object SamzaContainer extends Logging {
     new SamzaContainer(
       config = config,
       taskInstances = taskInstances,
+      taskInstanceMetrics = taskInstanceMetrics,
       runLoop = runLoop,
       systemAdmins = systemAdmins,
       consumerMultiplexer = consumerMultiplexer,
@@ -700,6 +700,7 @@ object SamzaContainer extends Logging {
 class SamzaContainer(
   config: Config,
   taskInstances: Map[TaskName, TaskInstance],
+  taskInstanceMetrics: Map[TaskName, TaskInstanceMetrics],
   runLoop: Runnable,
   systemAdmins: SystemAdmins,
   consumerMultiplexer: SystemConsumers,
@@ -879,9 +880,9 @@ class SamzaContainer(
   }
 
   def startMetrics {
-    info("Registering task instances with metrics.")
-
-    taskInstances.values.foreach(_.registerMetrics)
+    info("Registering task instance metrics.")
+    reporters.values.foreach(reporter =>
+      taskInstanceMetrics.values.foreach(taskMetrics => reporter.register(taskMetrics.source, taskMetrics.registry)))
 
     info("Starting JVM metrics.")
 
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index d5cf6c6..326156b 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -51,8 +51,8 @@ class SamzaContainerMetrics(
 
   val exceptions = newListGauge[DiagnosticsExceptionEvent]("exceptions")
 
-  def addStoreRestorationGauge(taskName: TaskName, storeName: String) {
-    taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" format(taskName.toString, storeName), -1L))
+  def addStoresRestorationGauge(taskName: TaskName) {
+    taskStoreRestorationMetrics.put(taskName, newGauge("%s-restore-time" format(taskName.toString), -1L))
   }
 
 }
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 0c8102b..fa17f24 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -52,7 +52,6 @@ class TaskInstance(
   val offsetManager: OffsetManager = new OffsetManager,
   storageManager: TaskStorageManager = null,
   tableManager: TableManager = null,
-  reporters: Map[String, MetricsReporter] = Map(),
   val systemStreamPartitions: Set[SystemStreamPartition] = Set(),
   val exceptionHandler: TaskInstanceExceptionHandler = new TaskInstanceExceptionHandler,
   jobModel: JobModel = null,
@@ -105,12 +104,6 @@ class TaskInstance(
 
   val streamsToDeleteCommittedMessages: Set[String] = config.getStreamIds.filter(config.getDeleteCommittedMessages).map(config.getPhysicalName).toSet
 
-  def registerMetrics {
-    debug("Registering metrics for taskName: %s" format taskName)
-
-    reporters.values.foreach(_.register(metrics.source, metrics.registry))
-  }
-
   def registerOffsets {
     debug("Registering offsets for taskName: %s" format taskName)
     offsetManager.register(taskName, systemStreamPartitions)
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index ad9637d..da61a35 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -113,7 +113,8 @@ public class ContainerStorageManager {
   private static final Logger LOG = LoggerFactory.getLogger(ContainerStorageManager.class);
   private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d";
   private static final String SIDEINPUTS_FLUSH_THREAD_NAME = "SideInputs Flush Thread";
-  private static final String SIDEINPUTS_METRICS_NAME = "samza-container-%s-sideinputs";
+  private static final String SIDEINPUTS_METRICS_PREFIX = "side-inputs-";
+  // We use a prefix to differentiate the SystemConsumersMetrics for side-inputs from the ones in SamzaContainer
 
   /** Maps containing relevant per-task objects */
   private final Map<TaskName, Map<String, StorageEngine>> taskStores;
@@ -215,7 +216,7 @@ public class ContainerStorageManager {
     this.storeConsumers = createStoreIndexedMap(this.changelogSystemStreams, storeSystemConsumers);
 
     // creating task restore managers
-    this.taskRestoreManagers = createTaskRestoreManagers(systemAdmins, clock);
+    this.taskRestoreManagers = createTaskRestoreManagers(systemAdmins, clock, this.samzaContainerMetrics);
 
     // create side input storage managers
     sideInputStorageManagers = createSideInputStorageManagers(clock);
@@ -229,15 +230,15 @@ public class ContainerStorageManager {
       scala.collection.immutable.Map<SystemStream, SystemStreamMetadata> inputStreamMetadata = streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(
           this.sideInputSystemStreams.values().stream().flatMap(Set::stream).collect(Collectors.toSet())).toSet(), false);
 
-      SystemConsumersMetrics systemConsumersMetrics = new SystemConsumersMetrics(
-          new MetricsRegistryMap(String.format(SIDEINPUTS_METRICS_NAME, containerModel.getId())));
+      SystemConsumersMetrics sideInputSystemConsumersMetrics = new SystemConsumersMetrics(samzaContainerMetrics.registry(), SIDEINPUTS_METRICS_PREFIX);
+      // we use the same registry as samza-container-metrics
 
       MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new RoundRobinChooserFactory(), config,
-          systemConsumersMetrics.registry(), systemAdmins);
+          sideInputSystemConsumersMetrics.registry(), systemAdmins);
 
       sideInputSystemConsumers =
           new SystemConsumers(chooser, ScalaJavaUtil.toScalaMap(this.sideInputConsumers), serdeManager,
-              systemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
+              sideInputSystemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
               SystemConsumers.DEFAULT_POLL_INTERVAL_MS(), ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()));
     }
 
@@ -336,11 +337,12 @@ public class ContainerStorageManager {
     return storeConsumers;
   }
 
-  private Map<TaskName, TaskRestoreManager> createTaskRestoreManagers(SystemAdmins systemAdmins, Clock clock) {
+  private Map<TaskName, TaskRestoreManager> createTaskRestoreManagers(SystemAdmins systemAdmins, Clock clock, SamzaContainerMetrics samzaContainerMetrics) {
     Map<TaskName, TaskRestoreManager> taskRestoreManagers = new HashMap<>();
     containerModel.getTasks().forEach((taskName, taskModel) -> {
         taskRestoreManagers.put(taskName,
             new TaskRestoreManager(taskModel, changelogSystemStreams, getNonSideInputStores(taskName), systemAdmins, clock));
+        samzaContainerMetrics.addStoresRestorationGauge(taskName);
       });
     return taskRestoreManagers;
   }
@@ -573,7 +575,6 @@ public class ContainerStorageManager {
     return this.sideInputStorageManagers.values().stream().collect(Collectors.toSet());
   }
 
-
   public void start() throws SamzaException {
     restoreStores();
     if (sideInputsPresent()) {
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
index 43d381b..afdce08 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
@@ -19,12 +19,13 @@
 
 package org.apache.samza.system
 
-import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.metrics.Counter
 import org.apache.samza.metrics.MetricsHelper
+import org.apache.samza.metrics.ReadableMetricsRegistry
 
-class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+class SystemConsumersMetrics(val registry: ReadableMetricsRegistry = new MetricsRegistryMap,
+  val prefix: String = "") extends MetricsHelper {
   val choseNull = newCounter("chose-null")
   val choseObject = newCounter("chose-object")
   val deserializationError = newCounter("deserialization error")
@@ -55,6 +56,8 @@ class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistry
     }
   }
 
+  override def getPrefix: String = prefix
+
   def registerSystemStreamPartition(systemStreamPartition: SystemStreamPartition) {
     systemStreamMessagesChosen += systemStreamPartition -> newCounter("%s-%s-%d-messages-chosen" format (systemStreamPartition.getSystem, systemStreamPartition.getStream, systemStreamPartition.getPartition.getPartitionId))
   }
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index acaecdb..48f8619 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -97,7 +97,6 @@ public class TestAsyncRunLoop {
         manager,
         null,
         null,
-        null,
         sspSet,
         new TaskInstanceExceptionHandler(taskInstanceMetrics, new scala.collection.immutable.HashSet<String>()),
         null,
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index d1f60bc..e75fe54 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -280,6 +280,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
     this.samzaContainer = new SamzaContainer(
       this.config,
       Map(TASK_NAME -> this.taskInstance),
+      Map(TASK_NAME -> new TaskInstanceMetrics),
       this.runLoop,
       this.systemAdmins,
       this.consumerMultiplexer,
diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
index 9bb485a..3ff651b 100644
--- a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
+++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
@@ -63,6 +63,7 @@ object StreamProcessorTestUtils {
     val container = new SamzaContainer(
       config = config,
       taskInstances = Map(taskName -> taskInstance),
+      taskInstanceMetrics = Map(taskName -> new TaskInstanceMetrics),
       runLoop = mockRunloop,
       systemAdmins = adminMultiplexer,
       consumerMultiplexer = consumerMultiplexer,


[samza] 03/03: [javadoc] Clarify semantics of the offset in IncomingMessageEnvelope as the offset provided by the system consumer for the message (#945)

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch 1.1.0
in repository https://gitbox.apache.org/repos/asf/samza.git

commit a9aa1c752dbe99c825513f6fdfc960a5b311f64e
Author: cameronlee314 <37...@users.noreply.github.com>
AuthorDate: Fri Mar 8 15:16:08 2019 -0800

    [javadoc] Clarify semantics of the offset in IncomingMessageEnvelope as the offset provided by the system consumer for the message (#945)
---
 .../java/org/apache/samza/system/IncomingMessageEnvelope.java    | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
index ae8335e..7392253 100644
--- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
+++ b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
@@ -46,7 +46,7 @@ public class IncomingMessageEnvelope {
    * Constructs a new IncomingMessageEnvelope from specified components.
    * @param systemStreamPartition The aggregate object representing the incoming stream name, the name of the cluster
    * from which the stream came, and the partition of the stream from which the message was received.
-   * @param offset The offset in the partition that the message was received from.
+   * @param offset offset associated with this message, provided by the system consumer that consumed the message
    * @param key A deserialized key received from the partition offset.
    * @param message A deserialized message received from the partition offset.
    */
@@ -58,7 +58,7 @@ public class IncomingMessageEnvelope {
    * Constructs a new IncomingMessageEnvelope from specified components.
    * @param systemStreamPartition The aggregate object representing the incoming stream name, the name of the cluster
    * from which the stream came, and the partition of the stream from which the message was received.
-   * @param offset The offset in the partition that the message was received from.
+   * @param offset offset associated with this message, provided by the system consumer that consumed the message
    * @param key A deserialized key received from the partition offset.
    * @param message A deserialized message received from the partition offset.
    * @param size size of the message and key in bytes.
@@ -77,7 +77,7 @@ public class IncomingMessageEnvelope {
    * Constructs a new IncomingMessageEnvelope from specified components
    * @param systemStreamPartition The aggregate object representing the incoming stream name, the name of the cluster
    * from which the stream came, and the partition of the stream from which the message was received.
-   * @param offset The offset in the partition that the message was received from.
+   * @param offset offset associated with this message, provided by the system consumer that consumed the message
    * @param key A deserialized key received from the partition offset.
    * @param message A deserialized message received from the partition offset.
    * @param size size of the message and key in bytes.
@@ -111,6 +111,9 @@ public class IncomingMessageEnvelope {
     return systemStreamPartition;
   }
 
+  /**
+   * Offset associated with this message, provided by the system consumer that consumed the message.
+   */
   public String getOffset() {
     return offset;
   }