You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/04/10 00:01:39 UTC
samza git commit: SAMZA-540: Expose checkpointed offset in
OffsetManager
Repository: samza
Updated Branches:
refs/heads/master 9427c9e17 -> 6946f78e1
SAMZA-540: Expose checkpointed offset in OffsetManager
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6946f78e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6946f78e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6946f78e
Branch: refs/heads/master
Commit: 6946f78e1a7066ed58587cc9e2105128e9d8fb51
Parents: 9427c9e
Author: Renato MarroquĂn Mogrovejo <re...@gmail.com>
Authored: Thu Apr 9 15:00:38 2015 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Thu Apr 9 15:00:38 2015 -0700
----------------------------------------------------------------------
docs/startup/hello-samza/versioned/index.md | 2 +-
.../apache/samza/checkpoint/OffsetManager.scala | 17 +++++++---
.../samza/checkpoint/OffsetManagerMetrics.scala | 34 ++++++++++++++++++++
.../apache/samza/container/SamzaContainer.scala | 4 ++-
.../samza/checkpoint/TestOffsetManager.scala | 26 +++++++++++++++
5 files changed, 77 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/6946f78e/docs/startup/hello-samza/versioned/index.md
----------------------------------------------------------------------
diff --git a/docs/startup/hello-samza/versioned/index.md b/docs/startup/hello-samza/versioned/index.md
index 507ceb5..b4ab23a 100644
--- a/docs/startup/hello-samza/versioned/index.md
+++ b/docs/startup/hello-samza/versioned/index.md
@@ -53,7 +53,7 @@ Before you can run a Samza job, you need to build a package for it. This package
{% highlight bash %}
mvn clean package
mkdir -p deploy/samza
-tar -xvf ./target/hello-samza-0.9.0-dist.tar.gz -C deploy/samza
+tar -xvf ./target/hello-samza-0.10.0-dist.tar.gz -C deploy/samza
{% endhighlight %}
### Run a Samza Job
http://git-wip-us.apache.org/repos/asf/samza/blob/6946f78e/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index a40c87f..85c1749 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -71,7 +71,8 @@ object OffsetManager extends Logging {
systemStreamMetadata: Map[SystemStream, SystemStreamMetadata],
config: Config,
checkpointManager: CheckpointManager = null,
- systemAdmins: Map[String, SystemAdmin] = Map()) = {
+ systemAdmins: Map[String, SystemAdmin] = Map(),
+ offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics) = {
debug("Building offset manager for %s." format systemStreamMetadata)
@@ -98,7 +99,7 @@ object OffsetManager extends Logging {
// Build OffsetSetting so we can create a map for OffsetManager.
(systemStream, OffsetSetting(systemStreamMetadata, defaultOffsetType, resetOffset))
}.toMap
- new OffsetManager(offsetSettings, checkpointManager, systemAdmins)
+ new OffsetManager(offsetSettings, checkpointManager, systemAdmins, offsetManagerMetrics)
}
}
@@ -136,7 +137,12 @@ class OffsetManager(
* SystemAdmins that are used to get next offsets from last checkpointed
* offsets. Map is from system name to SystemAdmin class for the system.
*/
- val systemAdmins: Map[String, SystemAdmin] = Map()) extends Logging {
+ val systemAdmins: Map[String, SystemAdmin] = Map(),
+
+ /**
+ * offsetManagerMetrics for keeping track of checkpointed offsets of each SystemStreamPartition.
+ */
+ val offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics ) extends Logging {
/**
* Last offsets processed for each SystemStreamPartition.
@@ -158,6 +164,8 @@ class OffsetManager(
def register(taskName: TaskName, systemStreamPartitionsToRegister: Set[SystemStreamPartition]) {
systemStreamPartitions.getOrElseUpdate(taskName, mutable.Set[SystemStreamPartition]()).addAll(systemStreamPartitionsToRegister)
+ // register metrics
+ systemStreamPartitions.foreach{ case (taskName, ssp) => ssp.foreach (ssp => offsetManagerMetrics.addCheckpointedOffset(ssp, "")) }
}
def start {
@@ -204,6 +212,7 @@ class OffsetManager(
val partitionOffsets = lastProcessedOffsets.filterKeys(sspsForTaskName.contains(_))
checkpointManager.writeCheckpoint(taskName, new Checkpoint(partitionOffsets))
+ lastProcessedOffsets.foreach{ case (ssp, checkpoint) => offsetManagerMetrics.checkpointedOffsets(ssp).set(checkpoint) }
} else {
debug("Skipping checkpointing for taskName %s because no checkpoint manager is defined." format taskName)
}
@@ -225,7 +234,6 @@ class OffsetManager(
private def registerCheckpointManager {
if (checkpointManager != null) {
debug("Registering checkpoint manager.")
-
systemStreamPartitions.keys.foreach(checkpointManager.register)
} else {
debug("Skipping checkpoint manager registration because no manager was defined.")
@@ -249,6 +257,7 @@ class OffsetManager(
if (!shouldKeep) {
info("Ignoring previously checkpointed offset %s for %s since the offset is for a stream that is not currently an input stream." format (offset, systemStreamPartition))
}
+ info("Checkpointed offset is currently %s for %s." format (offset, systemStreamPartition))
shouldKeep
}
} else {
http://git-wip-us.apache.org/repos/asf/samza/blob/6946f78e/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManagerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManagerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManagerMetrics.scala
new file mode 100644
index 0000000..b6219ca
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManagerMetrics.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint
+
+import org.apache.samza.metrics._
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.samza.system.SystemStreamPartition
+
+class OffsetManagerMetrics(val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper {
+
+ val checkpointedOffsets = new ConcurrentHashMap[SystemStreamPartition, Gauge[String]]
+
+ def addCheckpointedOffset(systemStreamPartition: SystemStreamPartition, checkpointedOffset: String) {
+ checkpointedOffsets.put(systemStreamPartition, newGauge("%s-%s-%d-checkpointed-offset" format (systemStreamPartition.getSystem, systemStreamPartition.getStream, systemStreamPartition.getPartition.getPartitionId), checkpointedOffset))
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/6946f78e/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 66640fe..720fbdc 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -61,6 +61,7 @@ import org.apache.samza.serializers.model.SamzaObjectMapper
import org.apache.samza.config.JobConfig.Config2Job
import java.lang.Thread.UncaughtExceptionHandler
import org.apache.samza.serializers._
+import org.apache.samza.checkpoint.OffsetManagerMetrics
object SamzaContainer extends Logging {
def main(args: Array[String]) {
@@ -147,6 +148,7 @@ object SamzaContainer extends Logging {
val samzaContainerMetrics = new SamzaContainerMetrics(containerName, registry)
val systemProducersMetrics = new SystemProducersMetrics(registry)
val systemConsumersMetrics = new SystemConsumersMetrics(registry)
+ val offsetManagerMetrics = new OffsetManagerMetrics(registry)
val inputSystemStreamPartitions = containerModel
.getTasks
@@ -336,7 +338,7 @@ object SamzaContainer extends Logging {
info("Got checkpoint manager: %s" format checkpointManager)
- val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins)
+ val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, offsetManagerMetrics)
info("Got offset manager: %s" format offsetManager)
http://git-wip-us.apache.org/repos/asf/samza/blob/6946f78e/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index d18d4c4..a281e79 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -87,6 +87,32 @@ class TestOffsetManager {
}
@Test
+ def testGetCheckpointedOffsetMetric{
+ val taskName = new TaskName("c")
+ val systemStream = new SystemStream("test-system", "test-stream")
+ val partition = new Partition(0)
+ val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+ val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+ val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
+ val config = new MapConfig
+ val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
+ val systemAdmins = Map("test-system" -> getSystemAdmin)
+ val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins)
+ offsetManager.register(taskName, Set(systemStreamPartition))
+ offsetManager.start
+ // Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
+ offsetManager.checkpoint(taskName)
+ assertEquals("45", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
+ offsetManager.update(systemStreamPartition, "46")
+ offsetManager.update(systemStreamPartition, "47")
+ offsetManager.checkpoint(taskName)
+ assertEquals("47", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
+ offsetManager.update(systemStreamPartition, "48")
+ offsetManager.checkpoint(taskName)
+ assertEquals("48", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
+ }
+
+ @Test
def testShouldResetStreams {
val taskName = new TaskName("c")
val systemStream = new SystemStream("test-system", "test-stream")