You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ma...@apache.org on 2014/04/07 23:00:35 UTC
git commit: SAMZA-223: Add a cache for stream metadata. Reviewed by
Chris Riccomini and Jakob Homan.
Repository: incubator-samza
Updated Branches:
refs/heads/master b4a323d16 -> 4b8514f22
SAMZA-223: Add a cache for stream metadata. Reviewed by Chris Riccomini and Jakob Homan.
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/4b8514f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/4b8514f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/4b8514f2
Branch: refs/heads/master
Commit: 4b8514f226cc75ca663f81954f4683f61638a80b
Parents: b4a323d
Author: Martin Kleppmann <mk...@linkedin.com>
Authored: Fri Apr 4 22:23:42 2014 +0100
Committer: Martin Kleppmann <mk...@linkedin.com>
Committed: Mon Apr 7 14:00:01 2014 -0700
----------------------------------------------------------------------
.../apache/samza/container/SamzaContainer.scala | 25 +---
.../samza/system/StreamMetadataCache.scala | 91 ++++++++++++++
.../org/apache/samza/util/SystemClock.scala | 32 +++++
.../main/scala/org/apache/samza/util/Util.scala | 35 +++---
.../samza/container/TestSamzaContainer.scala | 3 +-
.../samza/system/TestStreamMetadataCache.scala | 120 +++++++++++++++++++
6 files changed, 264 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4b8514f2/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index ef14643..24364f4 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -63,6 +63,7 @@ import scala.collection.JavaConversions._
import org.apache.samza.system.SystemAdmin
import org.apache.samza.system.SystemStreamMetadata
import org.apache.samza.checkpoint.OffsetManager
+import org.apache.samza.system.StreamMetadataCache
object SamzaContainer extends Logging {
def main(args: Array[String]) {
@@ -122,7 +123,8 @@ object SamzaContainer extends Logging {
info("Got system factories: %s" format systemFactories.keys)
- val inputStreamMetadata = getStreamMetadata(inputStreams.map(_.getSystemStream), systemAdmins)
+ val streamMetadataCache = new StreamMetadataCache(systemAdmins)
+ val inputStreamMetadata = streamMetadataCache.getStreamMetadata(inputStreams.map(_.getSystemStream))
info("Got input stream metadata: %s" format inputStreamMetadata)
@@ -224,7 +226,7 @@ object SamzaContainer extends Logging {
info("Got change log system streams: %s" format changeLogSystemStreams)
- val changeLogMetadata = getStreamMetadata(changeLogSystemStreams.values.toSet, systemAdmins)
+ val changeLogMetadata = streamMetadataCache.getStreamMetadata(changeLogSystemStreams.values.toSet)
info("Got change log stream metadata: %s" format changeLogMetadata)
@@ -445,25 +447,6 @@ object SamzaContainer extends Logging {
}
/**
- * Builds a map from SystemStream to SystemStreamMetadata for all streams
- * using SystemAdmins to fetch metadata.
- */
- def getStreamMetadata(streams: Set[SystemStream], systemAdmins: Map[String, SystemAdmin]): Map[SystemStream, SystemStreamMetadata] = {
- streams
- .groupBy[String](_.getSystem)
- .flatMap {
- case (systemName, systemStreams) =>
- systemAdmins
- .getOrElse(systemName, throw new SamzaException("Unable to find system admin definition for system %s." format systemName))
- .getSystemStreamMetadata(systemStreams.map(_.getStream))
- .map {
- case (streamName, metadata) => (new SystemStream(systemName, streamName), metadata)
- }
- }
- .toMap
- }
-
- /**
* Builds a map from SystemStreamPartition to oldest offset for changelogs.
*/
def getChangeLogOldestOffsetsForPartition(partition: Partition, inputStreamMetadata: Map[SystemStream, SystemStreamMetadata]): Map[SystemStream, String] = {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4b8514f2/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
new file mode 100644
index 0000000..aab81e5
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system
+
+import grizzled.slf4j.Logging
+import org.apache.samza.SamzaException
+import org.apache.samza.util.{Clock, SystemClock}
+import scala.collection.JavaConversions._
+
+/**
+ * Caches requests to SystemAdmin.getSystemStreamMetadata for a short while (by default
+ * 5 seconds), so that we can make many metadata requests in quick succession without
+ * hammering the actual systems. This is useful for example during task startup, when
+ * each task independently fetches the offsets for own partition.
+ */
+class StreamMetadataCache (
+ /** System implementations from which the actual metadata is loaded on cache miss */
+ systemAdmins: Map[String, SystemAdmin],
+
+ /** Maximum age (in milliseconds) of a cache entry */
+ val cacheTTLms: Integer = 5000,
+
+ /** Clock used for determining expiry (for mocking in tests) */
+ clock: Clock = SystemClock.instance) extends Logging {
+
+ private case class CacheEntry(metadata: SystemStreamMetadata, lastRefreshMs: Long)
+ private var cache = Map[SystemStream, CacheEntry]()
+ private val lock = new Object
+
+ /**
+ * Returns metadata about each of the given streams (such as first offset, newest
+ * offset, etc). If the metadata isn't in the cache, it is retrieved from the systems
+ * using the given SystemAdmins.
+ */
+ def getStreamMetadata(streams: Set[SystemStream]): Map[SystemStream, SystemStreamMetadata] = {
+ val time = clock.currentTimeMillis
+ val cacheHits = streams.flatMap(stream => getFromCache(stream, time)).toMap
+
+ val cacheMisses = (streams -- cacheHits.keySet)
+ .groupBy[String](_.getSystem)
+ .flatMap {
+ case (systemName, systemStreams) =>
+ systemAdmins
+ .getOrElse(systemName, throw new SamzaException("Cannot get metadata for unknown system: %s" format systemName))
+ .getSystemStreamMetadata(systemStreams.map(_.getStream))
+ .map {
+ case (streamName, metadata) => (new SystemStream(systemName, streamName) -> metadata)
+ }
+ }
+ .toMap
+
+ val allResults = cacheHits ++ cacheMisses
+ val missing = streams.filter(stream => allResults.getOrElse(stream, null) == null)
+ if (!missing.isEmpty) {
+ throw new SamzaException("Cannot get metadata for unknown streams: " + missing.mkString(", "))
+ }
+ cacheMisses.foreach { case (stream, metadata) => addToCache(stream, metadata, time) }
+ allResults
+ }
+
+ private def getFromCache(stream: SystemStream, now: Long) = {
+ cache.get(stream) match {
+ case Some(CacheEntry(metadata, lastRefresh)) =>
+ if (now - lastRefresh > cacheTTLms) None else Some(stream -> metadata)
+ case None => None
+ }
+ }
+
+ private def addToCache(stream: SystemStream, metadata: SystemStreamMetadata, now: Long) {
+ lock synchronized {
+ cache += stream -> CacheEntry(metadata, now)
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4b8514f2/samza-core/src/main/scala/org/apache/samza/util/SystemClock.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/SystemClock.scala b/samza-core/src/main/scala/org/apache/samza/util/SystemClock.scala
new file mode 100644
index 0000000..62253d6
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/util/SystemClock.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util
+
+/**
+ * Default implementation of the Clock interface, which uses the real
+ * system clock.
+ */
+class SystemClock extends Clock {
+ override def currentTimeMillis = System.currentTimeMillis
+}
+
+object SystemClock {
+ val instance = new SystemClock
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4b8514f2/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 2bd2c1c..1b548fd 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -27,7 +27,7 @@ import org.apache.samza.config.Config
import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.config.TaskConfig.Config2Task
import scala.collection.JavaConversions._
-import org.apache.samza.system.{ SystemStreamPartition, SystemFactory, SystemStream }
+import org.apache.samza.system.{SystemStreamPartition, SystemFactory, StreamMetadataCache, SystemStream}
import org.codehaus.jackson.map.ObjectMapper
import org.codehaus.jackson.`type`.TypeReference
import java.util
@@ -88,29 +88,24 @@ object Util extends Logging {
val inputSystemStreams = config.getInputStreams
val systemNames = config.getSystemNames.toSet
- systemNames.flatMap(systemName => {
- // Get SystemAdmin for system.
+ // Map the name of each system to the corresponding SystemAdmin
+ val systemAdmins = systemNames.map(systemName => {
val systemFactoryClassName = config
.getSystemFactory(systemName)
.getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
- val systemAdmin = systemFactory.getAdmin(systemName, config)
-
- // Get metadata for every stream that belongs to this system.
- val streamNames = inputSystemStreams
- .filter(_.getSystem.equals(systemName))
- .map(_.getStream)
- val systemStreamMetadata = systemAdmin.getSystemStreamMetadata(streamNames)
-
- // Get a set of all SSPs for every stream that belongs to this system.
- systemStreamMetadata
- .values
- .flatMap(systemStreamMetadata => {
- val streamName = systemStreamMetadata.getStreamName
- val systemStreamPartitionSet = systemStreamMetadata.getSystemStreamPartitionMetadata.keys
- systemStreamPartitionSet.map(new SystemStreamPartition(systemName, streamName, _)).toSet
- })
- })
+ systemName -> systemFactory.getAdmin(systemName, config)
+ }).toMap
+
+ // Get the set of partitions for each SystemStream from the stream metadata
+ new StreamMetadataCache(systemAdmins)
+ .getStreamMetadata(inputSystemStreams)
+ .flatMap { case (systemStream, metadata) =>
+ metadata
+ .getSystemStreamPartitionMetadata
+ .keys
+ .map(new SystemStreamPartition(systemStream, _))
+ }.toSet
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4b8514f2/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index a2d5820..699f6a9 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -42,6 +42,7 @@ import org.apache.samza.task.ClosableTask
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
import org.apache.samza.system.SystemStream
+import org.apache.samza.system.StreamMetadataCache
class TestSamzaContainer {
@Test
@@ -52,7 +53,7 @@ class TestSamzaContainer {
new SystemStreamPartition("test", "stream2", new Partition(0)),
new SystemStreamPartition("test", "stream2", new Partition(1)))
val systemAdmins = Map("test" -> new SinglePartitionWithoutOffsetsSystemAdmin)
- val metadata = SamzaContainer.getStreamMetadata(inputStreams.map(_.getSystemStream).toSet, systemAdmins)
+ val metadata = new StreamMetadataCache(systemAdmins).getStreamMetadata(inputStreams.map(_.getSystemStream).toSet)
assertNotNull(metadata)
assertEquals(2, metadata.size)
val stream1Metadata = metadata(new SystemStream("test", "stream1"))
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4b8514f2/samza-core/src/test/scala/org/apache/samza/system/TestStreamMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestStreamMetadataCache.scala b/samza-core/src/test/scala/org/apache/samza/system/TestStreamMetadataCache.scala
new file mode 100644
index 0000000..264d1b5
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestStreamMetadataCache.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system
+
+import org.apache.samza.{Partition, SamzaException}
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.util.Clock
+import org.junit.{Before, Test}
+import org.mockito.Mockito._
+import org.scalatest.Matchers._
+import org.scalatest.junit.AssertionsForJUnit
+import org.scalatest.mock.MockitoSugar
+import scala.collection.JavaConversions._
+
+class TestStreamMetadataCache extends AssertionsForJUnit with MockitoSugar {
+ def makeMetadata(streamNames: Set[String] = Set("stream"), numPartitions: Int = 4) = {
+ val partitions = (0 until numPartitions).map(partition => {
+ new Partition(partition) -> new SystemStreamPartitionMetadata("oldest", "newest", "upcoming")
+ }).toMap
+ streamNames.map(name => name -> new SystemStreamMetadata(name, partitions)).toMap
+ }
+
+ @Test
+ def testFetchUncachedMetadataFromSystemAdmin {
+ val systemAdmins = Map("foo" -> mock[SystemAdmin])
+ when(systemAdmins("foo").getSystemStreamMetadata(Set("bar"))).thenReturn(makeMetadata(Set("bar")))
+ val streams = Set(new SystemStream("foo", "bar"))
+ val cache = new StreamMetadataCache(systemAdmins)
+
+ val result = cache.getStreamMetadata(streams)
+ streams shouldEqual result.keySet
+ result(new SystemStream("foo", "bar")).getSystemStreamPartitionMetadata.size shouldBe 4
+ verify(systemAdmins("foo"), times(1)).getSystemStreamMetadata(Set("bar"))
+ }
+
+ @Test
+ def testCacheExpiry {
+ val clock = mock[Clock]
+ val systemAdmins = Map("system" -> mock[SystemAdmin])
+ when(systemAdmins("system").getSystemStreamMetadata(Set("stream"))).thenReturn(makeMetadata())
+ val streams = Set(new SystemStream("system", "stream"))
+ val cache = new StreamMetadataCache(systemAdmins = systemAdmins, clock = clock)
+
+ when(clock.currentTimeMillis).thenReturn(0)
+ cache.getStreamMetadata(streams)
+ verify(systemAdmins("system"), times(1)).getSystemStreamMetadata(Set("stream"))
+
+ when(clock.currentTimeMillis).thenReturn(cache.cacheTTLms / 2)
+ cache.getStreamMetadata(streams)
+ verify(systemAdmins("system"), times(1)).getSystemStreamMetadata(Set("stream"))
+
+ when(clock.currentTimeMillis).thenReturn(2 * cache.cacheTTLms)
+ cache.getStreamMetadata(streams)
+ cache.getStreamMetadata(streams)
+ cache.getStreamMetadata(streams)
+ verify(systemAdmins("system"), times(2)).getSystemStreamMetadata(Set("stream"))
+ }
+
+ @Test
+ def testGroupingRequestsBySystem {
+ val systemAdmins = Map("sys1" -> mock[SystemAdmin], "sys2" -> mock[SystemAdmin])
+ when(systemAdmins("sys1").getSystemStreamMetadata(Set("stream1a", "stream1b")))
+ .thenReturn(makeMetadata(Set("stream1a", "stream1b"), numPartitions = 3))
+ when(systemAdmins("sys2").getSystemStreamMetadata(Set("stream2a", "stream2b")))
+ .thenReturn(makeMetadata(Set("stream2a", "stream2b"), numPartitions = 5))
+ val streams = Set(
+ new SystemStream("sys1", "stream1a"), new SystemStream("sys1", "stream1b"),
+ new SystemStream("sys2", "stream2a"), new SystemStream("sys2", "stream2b")
+ )
+ val result = new StreamMetadataCache(systemAdmins).getStreamMetadata(streams)
+ result.keySet shouldEqual streams
+ streams.foreach(stream => {
+ val expectedPartitions = if (stream.getSystem == "sys1") 3 else 5
+ result(stream).getSystemStreamPartitionMetadata.size shouldEqual expectedPartitions
+ })
+ verify(systemAdmins("sys1"), times(1)).getSystemStreamMetadata(Set("stream1a", "stream1b"))
+ verify(systemAdmins("sys2"), times(1)).getSystemStreamMetadata(Set("stream2a", "stream2b"))
+ }
+
+ @Test
+ def testSystemOmitsStreamFromResult {
+ val systemAdmins = Map("system" -> mock[SystemAdmin])
+ when(systemAdmins("system").getSystemStreamMetadata(Set("stream1", "stream2")))
+ .thenReturn(makeMetadata(Set("stream1"))) // metadata doesn't include stream2
+ val streams = Set(new SystemStream("system", "stream1"), new SystemStream("system", "stream2"))
+ val exception = intercept[SamzaException] {
+ new StreamMetadataCache(systemAdmins).getStreamMetadata(streams)
+ }
+ exception.getMessage should startWith ("Cannot get metadata for unknown streams")
+ }
+
+ @Test
+ def testSystemReturnsNullMetadata {
+ val systemAdmins = Map("system" -> mock[SystemAdmin])
+ when(systemAdmins("system").getSystemStreamMetadata(Set("stream")))
+ .thenReturn(Map("stream" -> null))
+ val streams = Set(new SystemStream("system", "stream"))
+ val exception = intercept[SamzaException] {
+ new StreamMetadataCache(systemAdmins).getStreamMetadata(streams)
+ }
+ exception.getMessage should startWith ("Cannot get metadata for unknown streams")
+ }
+}