You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/04/10 00:01:39 UTC

samza git commit: SAMZA-540: Expose checkpointed offset in OffsetManager

Repository: samza
Updated Branches:
  refs/heads/master 9427c9e17 -> 6946f78e1


SAMZA-540: Expose checkpointed offset in OffsetManager


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6946f78e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6946f78e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6946f78e

Branch: refs/heads/master
Commit: 6946f78e1a7066ed58587cc9e2105128e9d8fb51
Parents: 9427c9e
Author: Renato MarroquĂ­n Mogrovejo <re...@gmail.com>
Authored: Thu Apr 9 15:00:38 2015 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Thu Apr 9 15:00:38 2015 -0700

----------------------------------------------------------------------
 docs/startup/hello-samza/versioned/index.md     |  2 +-
 .../apache/samza/checkpoint/OffsetManager.scala | 17 +++++++---
 .../samza/checkpoint/OffsetManagerMetrics.scala | 34 ++++++++++++++++++++
 .../apache/samza/container/SamzaContainer.scala |  4 ++-
 .../samza/checkpoint/TestOffsetManager.scala    | 26 +++++++++++++++
 5 files changed, 77 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/6946f78e/docs/startup/hello-samza/versioned/index.md
----------------------------------------------------------------------
diff --git a/docs/startup/hello-samza/versioned/index.md b/docs/startup/hello-samza/versioned/index.md
index 507ceb5..b4ab23a 100644
--- a/docs/startup/hello-samza/versioned/index.md
+++ b/docs/startup/hello-samza/versioned/index.md
@@ -53,7 +53,7 @@ Before you can run a Samza job, you need to build a package for it. This package
 {% highlight bash %}
 mvn clean package
 mkdir -p deploy/samza
-tar -xvf ./target/hello-samza-0.9.0-dist.tar.gz -C deploy/samza
+tar -xvf ./target/hello-samza-0.10.0-dist.tar.gz -C deploy/samza
 {% endhighlight %}
 
 ### Run a Samza Job

http://git-wip-us.apache.org/repos/asf/samza/blob/6946f78e/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index a40c87f..85c1749 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -71,7 +71,8 @@ object OffsetManager extends Logging {
     systemStreamMetadata: Map[SystemStream, SystemStreamMetadata],
     config: Config,
     checkpointManager: CheckpointManager = null,
-    systemAdmins: Map[String, SystemAdmin] = Map()) = {
+    systemAdmins: Map[String, SystemAdmin] = Map(),
+    offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics) = {
 
     debug("Building offset manager for %s." format systemStreamMetadata)
 
@@ -98,7 +99,7 @@ object OffsetManager extends Logging {
           // Build OffsetSetting so we can create a map for OffsetManager.
           (systemStream, OffsetSetting(systemStreamMetadata, defaultOffsetType, resetOffset))
       }.toMap
-    new OffsetManager(offsetSettings, checkpointManager, systemAdmins)
+    new OffsetManager(offsetSettings, checkpointManager, systemAdmins, offsetManagerMetrics)
   }
 }
 
@@ -136,7 +137,12 @@ class OffsetManager(
    * SystemAdmins that are used to get next offsets from last checkpointed
    * offsets. Map is from system name to SystemAdmin class for the system.
    */
-  val systemAdmins: Map[String, SystemAdmin] = Map()) extends Logging {
+  val systemAdmins: Map[String, SystemAdmin] = Map(),
+
+  /**
+   * offsetManagerMetrics for keeping track of checkpointed offsets of each SystemStreamPartition.
+   */
+  val offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics ) extends Logging {
 
   /**
    * Last offsets processed for each SystemStreamPartition.
@@ -158,6 +164,8 @@ class OffsetManager(
 
   def register(taskName: TaskName, systemStreamPartitionsToRegister: Set[SystemStreamPartition]) {
     systemStreamPartitions.getOrElseUpdate(taskName, mutable.Set[SystemStreamPartition]()).addAll(systemStreamPartitionsToRegister)
+    // register metrics
+    systemStreamPartitions.foreach{ case (taskName, ssp) => ssp.foreach (ssp => offsetManagerMetrics.addCheckpointedOffset(ssp, "")) }
   }
 
   def start {
@@ -204,6 +212,7 @@ class OffsetManager(
       val partitionOffsets = lastProcessedOffsets.filterKeys(sspsForTaskName.contains(_))
 
       checkpointManager.writeCheckpoint(taskName, new Checkpoint(partitionOffsets))
+      lastProcessedOffsets.foreach{ case (ssp, checkpoint) => offsetManagerMetrics.checkpointedOffsets(ssp).set(checkpoint) }
     } else {
       debug("Skipping checkpointing for taskName %s because no checkpoint manager is defined." format taskName)
     }
@@ -225,7 +234,6 @@ class OffsetManager(
   private def registerCheckpointManager {
     if (checkpointManager != null) {
       debug("Registering checkpoint manager.")
-
       systemStreamPartitions.keys.foreach(checkpointManager.register)
     } else {
       debug("Skipping checkpoint manager registration because no manager was defined.")
@@ -249,6 +257,7 @@ class OffsetManager(
             if (!shouldKeep) {
               info("Ignoring previously checkpointed offset %s for %s since the offset is for a stream that is not currently an input stream." format (offset, systemStreamPartition))
             }
+            info("Checkpointed offset is currently %s for %s." format (offset, systemStreamPartition))
             shouldKeep
         }
     } else {

http://git-wip-us.apache.org/repos/asf/samza/blob/6946f78e/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManagerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManagerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManagerMetrics.scala
new file mode 100644
index 0000000..b6219ca
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManagerMetrics.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint
+
+import org.apache.samza.metrics._
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.samza.system.SystemStreamPartition
+
+class OffsetManagerMetrics(val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+
+  val checkpointedOffsets = new ConcurrentHashMap[SystemStreamPartition, Gauge[String]]
+
+  def addCheckpointedOffset(systemStreamPartition: SystemStreamPartition, checkpointedOffset: String) {
+    checkpointedOffsets.put(systemStreamPartition, newGauge("%s-%s-%d-checkpointed-offset" format (systemStreamPartition.getSystem, systemStreamPartition.getStream, systemStreamPartition.getPartition.getPartitionId), checkpointedOffset))
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6946f78e/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 66640fe..720fbdc 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -61,6 +61,7 @@ import org.apache.samza.serializers.model.SamzaObjectMapper
 import org.apache.samza.config.JobConfig.Config2Job
 import java.lang.Thread.UncaughtExceptionHandler
 import org.apache.samza.serializers._
+import org.apache.samza.checkpoint.OffsetManagerMetrics
 
 object SamzaContainer extends Logging {
   def main(args: Array[String]) {
@@ -147,6 +148,7 @@ object SamzaContainer extends Logging {
     val samzaContainerMetrics = new SamzaContainerMetrics(containerName, registry)
     val systemProducersMetrics = new SystemProducersMetrics(registry)
     val systemConsumersMetrics = new SystemConsumersMetrics(registry)
+    val offsetManagerMetrics = new OffsetManagerMetrics(registry)
 
     val inputSystemStreamPartitions = containerModel
       .getTasks
@@ -336,7 +338,7 @@ object SamzaContainer extends Logging {
 
     info("Got checkpoint manager: %s" format checkpointManager)
 
-    val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins)
+    val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, offsetManagerMetrics)
 
     info("Got offset manager: %s" format offsetManager)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6946f78e/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index d18d4c4..a281e79 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -87,6 +87,32 @@ class TestOffsetManager {
   }
 
   @Test
+  def testGetCheckpointedOffsetMetric{
+    val taskName = new TaskName("c")
+    val systemStream = new SystemStream("test-system", "test-stream")
+    val partition = new Partition(0)
+    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+    val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
+    val config = new MapConfig
+    val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
+    val systemAdmins = Map("test-system" -> getSystemAdmin)
+    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins)
+    offsetManager.register(taskName, Set(systemStreamPartition))
+    offsetManager.start
+    // Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
+    offsetManager.checkpoint(taskName)
+    assertEquals("45", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
+    offsetManager.update(systemStreamPartition, "46")
+    offsetManager.update(systemStreamPartition, "47")
+    offsetManager.checkpoint(taskName)
+    assertEquals("47", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
+    offsetManager.update(systemStreamPartition, "48")
+    offsetManager.checkpoint(taskName)
+    assertEquals("48", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
+  }
+
+  @Test
   def testShouldResetStreams {
     val taskName = new TaskName("c")
     val systemStream = new SystemStream("test-system", "test-stream")