You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ma...@apache.org on 2014/04/07 23:00:35 UTC

git commit: SAMZA-223: Add a cache for stream metadata. Reviewed by Chris Riccomini and Jakob Homan.

Repository: incubator-samza
Updated Branches:
  refs/heads/master b4a323d16 -> 4b8514f22


SAMZA-223: Add a cache for stream metadata. Reviewed by Chris Riccomini and Jakob Homan.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/4b8514f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/4b8514f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/4b8514f2

Branch: refs/heads/master
Commit: 4b8514f226cc75ca663f81954f4683f61638a80b
Parents: b4a323d
Author: Martin Kleppmann <mk...@linkedin.com>
Authored: Fri Apr 4 22:23:42 2014 +0100
Committer: Martin Kleppmann <mk...@linkedin.com>
Committed: Mon Apr 7 14:00:01 2014 -0700

----------------------------------------------------------------------
 .../apache/samza/container/SamzaContainer.scala |  25 +---
 .../samza/system/StreamMetadataCache.scala      |  91 ++++++++++++++
 .../org/apache/samza/util/SystemClock.scala     |  32 +++++
 .../main/scala/org/apache/samza/util/Util.scala |  35 +++---
 .../samza/container/TestSamzaContainer.scala    |   3 +-
 .../samza/system/TestStreamMetadataCache.scala  | 120 +++++++++++++++++++
 6 files changed, 264 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4b8514f2/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index ef14643..24364f4 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -63,6 +63,7 @@ import scala.collection.JavaConversions._
 import org.apache.samza.system.SystemAdmin
 import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.checkpoint.OffsetManager
+import org.apache.samza.system.StreamMetadataCache
 
 object SamzaContainer extends Logging {
   def main(args: Array[String]) {
@@ -122,7 +123,8 @@ object SamzaContainer extends Logging {
 
     info("Got system factories: %s" format systemFactories.keys)
 
-    val inputStreamMetadata = getStreamMetadata(inputStreams.map(_.getSystemStream), systemAdmins)
+    val streamMetadataCache = new StreamMetadataCache(systemAdmins)
+    val inputStreamMetadata = streamMetadataCache.getStreamMetadata(inputStreams.map(_.getSystemStream))
 
     info("Got input stream metadata: %s" format inputStreamMetadata)
 
@@ -224,7 +226,7 @@ object SamzaContainer extends Logging {
 
     info("Got change log system streams: %s" format changeLogSystemStreams)
 
-    val changeLogMetadata = getStreamMetadata(changeLogSystemStreams.values.toSet, systemAdmins)
+    val changeLogMetadata = streamMetadataCache.getStreamMetadata(changeLogSystemStreams.values.toSet)
 
     info("Got change log stream metadata: %s" format changeLogMetadata)
 
@@ -445,25 +447,6 @@ object SamzaContainer extends Logging {
   }
 
   /**
-   * Builds a map from SystemStream to SystemStreamMetadata for all streams
-   * using SystemAdmins to fetch metadata.
-   */
-  def getStreamMetadata(streams: Set[SystemStream], systemAdmins: Map[String, SystemAdmin]): Map[SystemStream, SystemStreamMetadata] = {
-    streams
-      .groupBy[String](_.getSystem)
-      .flatMap {
-        case (systemName, systemStreams) =>
-          systemAdmins
-            .getOrElse(systemName, throw new SamzaException("Unable to find system admin definition for system %s." format systemName))
-            .getSystemStreamMetadata(systemStreams.map(_.getStream))
-            .map {
-              case (streamName, metadata) => (new SystemStream(systemName, streamName), metadata)
-            }
-      }
-      .toMap
-  }
-
-  /**
    * Builds a map from SystemStreamPartition to oldest offset for changelogs.
    */
   def getChangeLogOldestOffsetsForPartition(partition: Partition, inputStreamMetadata: Map[SystemStream, SystemStreamMetadata]): Map[SystemStream, String] = {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4b8514f2/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
new file mode 100644
index 0000000..aab81e5
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system
+
+import grizzled.slf4j.Logging
+import org.apache.samza.SamzaException
+import org.apache.samza.util.{Clock, SystemClock}
+import scala.collection.JavaConversions._
+
+/**
+ * Caches requests to SystemAdmin.getSystemStreamMetadata for a short while (by default
+ * 5 seconds), so that we can make many metadata requests in quick succession without
+ * hammering the actual systems. This is useful for example during task startup, when
+ * each task independently fetches the offsets for own partition.
+ */
+class StreamMetadataCache (
+    /** System implementations from which the actual metadata is loaded on cache miss */
+    systemAdmins: Map[String, SystemAdmin],
+
+    /** Maximum age (in milliseconds) of a cache entry */
+    val cacheTTLms: Integer = 5000,
+
+    /** Clock used for determining expiry (for mocking in tests) */
+    clock: Clock = SystemClock.instance) extends Logging {
+
+  private case class CacheEntry(metadata: SystemStreamMetadata, lastRefreshMs: Long)
+  private var cache = Map[SystemStream, CacheEntry]()
+  private val lock = new Object
+
+  /**
+   * Returns metadata about each of the given streams (such as first offset, newest
+   * offset, etc). If the metadata isn't in the cache, it is retrieved from the systems
+   * using the given SystemAdmins.
+   */
+  def getStreamMetadata(streams: Set[SystemStream]): Map[SystemStream, SystemStreamMetadata] = {
+    val time = clock.currentTimeMillis
+    val cacheHits = streams.flatMap(stream => getFromCache(stream, time)).toMap
+
+    val cacheMisses = (streams -- cacheHits.keySet)
+      .groupBy[String](_.getSystem)
+      .flatMap {
+        case (systemName, systemStreams) =>
+          systemAdmins
+            .getOrElse(systemName, throw new SamzaException("Cannot get metadata for unknown system: %s" format systemName))
+            .getSystemStreamMetadata(systemStreams.map(_.getStream))
+            .map {
+              case (streamName, metadata) => (new SystemStream(systemName, streamName) -> metadata)
+            }
+      }
+      .toMap
+
+    val allResults = cacheHits ++ cacheMisses
+    val missing = streams.filter(stream => allResults.getOrElse(stream, null) == null)
+    if (!missing.isEmpty) {
+      throw new SamzaException("Cannot get metadata for unknown streams: " + missing.mkString(", "))
+    }
+    cacheMisses.foreach { case (stream, metadata) => addToCache(stream, metadata, time) }
+    allResults
+  }
+
+  private def getFromCache(stream: SystemStream, now: Long) = {
+    cache.get(stream) match {
+      case Some(CacheEntry(metadata, lastRefresh)) =>
+        if (now - lastRefresh > cacheTTLms) None else Some(stream -> metadata)
+      case None => None
+    }
+  }
+
+  private def addToCache(stream: SystemStream, metadata: SystemStreamMetadata, now: Long) {
+    lock synchronized {
+      cache += stream -> CacheEntry(metadata, now)
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4b8514f2/samza-core/src/main/scala/org/apache/samza/util/SystemClock.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/SystemClock.scala b/samza-core/src/main/scala/org/apache/samza/util/SystemClock.scala
new file mode 100644
index 0000000..62253d6
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/util/SystemClock.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util
+
+/**
+ * Default implementation of the Clock interface, which uses the real
+ * system clock.
+ */
+class SystemClock extends Clock {
+  override def currentTimeMillis = System.currentTimeMillis
+}
+
+object SystemClock {
+  val instance = new SystemClock
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4b8514f2/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 2bd2c1c..1b548fd 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -27,7 +27,7 @@ import org.apache.samza.config.Config
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.TaskConfig.Config2Task
 import scala.collection.JavaConversions._
-import org.apache.samza.system.{ SystemStreamPartition, SystemFactory, SystemStream }
+import org.apache.samza.system.{SystemStreamPartition, SystemFactory, StreamMetadataCache, SystemStream}
 import org.codehaus.jackson.map.ObjectMapper
 import org.codehaus.jackson.`type`.TypeReference
 import java.util
@@ -88,29 +88,24 @@ object Util extends Logging {
     val inputSystemStreams = config.getInputStreams
     val systemNames = config.getSystemNames.toSet
 
-    systemNames.flatMap(systemName => {
-      // Get SystemAdmin for system.
+    // Map the name of each system to the corresponding SystemAdmin
+    val systemAdmins = systemNames.map(systemName => {
       val systemFactoryClassName = config
         .getSystemFactory(systemName)
         .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
       val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
-      val systemAdmin = systemFactory.getAdmin(systemName, config)
-
-      // Get metadata for every stream that belongs to this system.
-      val streamNames = inputSystemStreams
-        .filter(_.getSystem.equals(systemName))
-        .map(_.getStream)
-      val systemStreamMetadata = systemAdmin.getSystemStreamMetadata(streamNames)
-
-      // Get a set of all SSPs for every stream that belongs to this system.
-      systemStreamMetadata
-        .values
-        .flatMap(systemStreamMetadata => {
-          val streamName = systemStreamMetadata.getStreamName
-          val systemStreamPartitionSet = systemStreamMetadata.getSystemStreamPartitionMetadata.keys
-          systemStreamPartitionSet.map(new SystemStreamPartition(systemName, streamName, _)).toSet
-        })
-    })
+      systemName -> systemFactory.getAdmin(systemName, config)
+    }).toMap
+
+    // Get the set of partitions for each SystemStream from the stream metadata
+    new StreamMetadataCache(systemAdmins)
+      .getStreamMetadata(inputSystemStreams)
+      .flatMap { case (systemStream, metadata) =>
+        metadata
+          .getSystemStreamPartitionMetadata
+          .keys
+          .map(new SystemStreamPartition(systemStream, _))
+      }.toSet
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4b8514f2/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index a2d5820..699f6a9 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -42,6 +42,7 @@ import org.apache.samza.task.ClosableTask
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
 import org.apache.samza.system.SystemStream
+import org.apache.samza.system.StreamMetadataCache
 
 class TestSamzaContainer {
   @Test
@@ -52,7 +53,7 @@ class TestSamzaContainer {
       new SystemStreamPartition("test", "stream2", new Partition(0)),
       new SystemStreamPartition("test", "stream2", new Partition(1)))
     val systemAdmins = Map("test" -> new SinglePartitionWithoutOffsetsSystemAdmin)
-    val metadata = SamzaContainer.getStreamMetadata(inputStreams.map(_.getSystemStream).toSet, systemAdmins)
+    val metadata = new StreamMetadataCache(systemAdmins).getStreamMetadata(inputStreams.map(_.getSystemStream).toSet)
     assertNotNull(metadata)
     assertEquals(2, metadata.size)
     val stream1Metadata = metadata(new SystemStream("test", "stream1"))

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4b8514f2/samza-core/src/test/scala/org/apache/samza/system/TestStreamMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestStreamMetadataCache.scala b/samza-core/src/test/scala/org/apache/samza/system/TestStreamMetadataCache.scala
new file mode 100644
index 0000000..264d1b5
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestStreamMetadataCache.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system
+
+import org.apache.samza.{Partition, SamzaException}
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.util.Clock
+import org.junit.{Before, Test}
+import org.mockito.Mockito._
+import org.scalatest.Matchers._
+import org.scalatest.junit.AssertionsForJUnit
+import org.scalatest.mock.MockitoSugar
+import scala.collection.JavaConversions._
+
+class TestStreamMetadataCache extends AssertionsForJUnit with MockitoSugar {
+  def makeMetadata(streamNames: Set[String] = Set("stream"), numPartitions: Int = 4) = {
+    val partitions = (0 until numPartitions).map(partition => {
+      new Partition(partition) -> new SystemStreamPartitionMetadata("oldest", "newest", "upcoming")
+    }).toMap
+    streamNames.map(name => name -> new SystemStreamMetadata(name, partitions)).toMap
+  }
+
+  @Test
+  def testFetchUncachedMetadataFromSystemAdmin {
+    val systemAdmins = Map("foo" -> mock[SystemAdmin])
+    when(systemAdmins("foo").getSystemStreamMetadata(Set("bar"))).thenReturn(makeMetadata(Set("bar")))
+    val streams = Set(new SystemStream("foo", "bar"))
+    val cache = new StreamMetadataCache(systemAdmins)
+
+    val result = cache.getStreamMetadata(streams)
+    streams shouldEqual result.keySet
+    result(new SystemStream("foo", "bar")).getSystemStreamPartitionMetadata.size shouldBe 4
+    verify(systemAdmins("foo"), times(1)).getSystemStreamMetadata(Set("bar"))
+  }
+
+  @Test
+  def testCacheExpiry {
+    val clock = mock[Clock]
+    val systemAdmins = Map("system" -> mock[SystemAdmin])
+    when(systemAdmins("system").getSystemStreamMetadata(Set("stream"))).thenReturn(makeMetadata())
+    val streams = Set(new SystemStream("system", "stream"))
+    val cache = new StreamMetadataCache(systemAdmins = systemAdmins, clock = clock)
+
+    when(clock.currentTimeMillis).thenReturn(0)
+    cache.getStreamMetadata(streams)
+    verify(systemAdmins("system"), times(1)).getSystemStreamMetadata(Set("stream"))
+
+    when(clock.currentTimeMillis).thenReturn(cache.cacheTTLms / 2)
+    cache.getStreamMetadata(streams)
+    verify(systemAdmins("system"), times(1)).getSystemStreamMetadata(Set("stream"))
+
+    when(clock.currentTimeMillis).thenReturn(2 * cache.cacheTTLms)
+    cache.getStreamMetadata(streams)
+    cache.getStreamMetadata(streams)
+    cache.getStreamMetadata(streams)
+    verify(systemAdmins("system"), times(2)).getSystemStreamMetadata(Set("stream"))
+  }
+
+  @Test
+  def testGroupingRequestsBySystem {
+    val systemAdmins = Map("sys1" -> mock[SystemAdmin], "sys2" -> mock[SystemAdmin])
+    when(systemAdmins("sys1").getSystemStreamMetadata(Set("stream1a", "stream1b")))
+      .thenReturn(makeMetadata(Set("stream1a", "stream1b"), numPartitions = 3))
+    when(systemAdmins("sys2").getSystemStreamMetadata(Set("stream2a", "stream2b")))
+      .thenReturn(makeMetadata(Set("stream2a", "stream2b"), numPartitions = 5))
+    val streams = Set(
+      new SystemStream("sys1", "stream1a"), new SystemStream("sys1", "stream1b"),
+      new SystemStream("sys2", "stream2a"), new SystemStream("sys2", "stream2b")
+    )
+    val result = new StreamMetadataCache(systemAdmins).getStreamMetadata(streams)
+    result.keySet shouldEqual streams
+    streams.foreach(stream => {
+      val expectedPartitions = if (stream.getSystem == "sys1") 3 else 5
+      result(stream).getSystemStreamPartitionMetadata.size shouldEqual expectedPartitions
+    })
+    verify(systemAdmins("sys1"), times(1)).getSystemStreamMetadata(Set("stream1a", "stream1b"))
+    verify(systemAdmins("sys2"), times(1)).getSystemStreamMetadata(Set("stream2a", "stream2b"))
+  }
+
+  @Test
+  def testSystemOmitsStreamFromResult {
+    val systemAdmins = Map("system" -> mock[SystemAdmin])
+    when(systemAdmins("system").getSystemStreamMetadata(Set("stream1", "stream2")))
+      .thenReturn(makeMetadata(Set("stream1"))) // metadata doesn't include stream2
+    val streams = Set(new SystemStream("system", "stream1"), new SystemStream("system", "stream2"))
+    val exception = intercept[SamzaException] {
+      new StreamMetadataCache(systemAdmins).getStreamMetadata(streams)
+    }
+    exception.getMessage should startWith ("Cannot get metadata for unknown streams")
+  }
+
+  @Test
+  def testSystemReturnsNullMetadata {
+    val systemAdmins = Map("system" -> mock[SystemAdmin])
+    when(systemAdmins("system").getSystemStreamMetadata(Set("stream")))
+      .thenReturn(Map("stream" -> null))
+    val streams = Set(new SystemStream("system", "stream"))
+    val exception = intercept[SamzaException] {
+      new StreamMetadataCache(systemAdmins).getStreamMetadata(streams)
+    }
+    exception.getMessage should startWith ("Cannot get metadata for unknown streams")
+  }
+}