You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/03/18 22:13:06 UTC

[samza] branch master updated: SAMZA-2129: Move the offset comparison check to SystemConsumers layer. (#954)

This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 275d757  SAMZA-2129: Move the offset comparison check to SystemConsumers layer. (#954)
275d757 is described below

commit 275d7570bc65aa3a0592ea3a6ec4063b91c4b222
Author: shanthoosh <sv...@linkedin.com>
AuthorDate: Mon Mar 18 15:13:01 2019 -0700

    SAMZA-2129: Move the offset comparison check to SystemConsumers layer. (#954)
    
    1. Currently it is duplicated in some of the SystemConsumer implementation.
    2. Some SystemConsumer implementations do not perform this offsetComparator check
    in the implementation of register method.
    
    Moving this one level up from SystemConsumer.register(SystemStreamPartition, offset) API implementation to  SystemConsumers.register(SystemStreamPartition, offset) API implementation removes unnecessary duplication and ensures functional correctness.
---
 .../apache/samza/container/SamzaContainer.scala    |  1 +
 .../samza/storage/ContainerStorageManager.java     |  2 +-
 .../org/apache/samza/system/SystemConsumers.scala  | 21 +++++++-
 .../org/apache/samza/task/TestAsyncRunLoop.java    | 10 +++-
 .../samza/processor/StreamProcessorTestUtils.scala |  2 +-
 .../apache/samza/system/TestSystemConsumers.scala  | 62 +++++++++++++++-------
 6 files changed, 75 insertions(+), 23 deletions(-)

diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index ab89396..70ff87d 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -464,6 +464,7 @@ object SamzaContainer extends Logging {
     val consumerMultiplexer = new SystemConsumers(
       chooser = chooser,
       consumers = consumers,
+      systemAdmins = systemAdmins,
       serdeManager = serdeManager,
       metrics = systemConsumersMetrics,
       dropDeserializationError = dropDeserializationError,
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index da61a35..a9443b4 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -237,7 +237,7 @@ public class ContainerStorageManager {
           sideInputSystemConsumersMetrics.registry(), systemAdmins);
 
       sideInputSystemConsumers =
-          new SystemConsumers(chooser, ScalaJavaUtil.toScalaMap(this.sideInputConsumers), serdeManager,
+          new SystemConsumers(chooser, ScalaJavaUtil.toScalaMap(this.sideInputConsumers), systemAdmins, serdeManager,
               sideInputSystemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
               SystemConsumers.DEFAULT_POLL_INTERVAL_MS(), ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()));
     }
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 68b2f09..8408433 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -63,6 +63,11 @@ class SystemConsumers (
   consumers: Map[String, SystemConsumer],
 
   /**
+   * Provides a mapping from system name to a {@see SystemAdmin}.
+   */
+  systemAdmins: SystemAdmins,
+
+  /**
    * The class that handles deserialization of incoming messages.
    */
   serdeManager: SerdeManager = new SerdeManager,
@@ -111,6 +116,11 @@ class SystemConsumers (
   val clock: () => Long = () => System.nanoTime()) extends Logging with TimerUtil {
 
   /**
+   * Mapping from the {@see SystemStreamPartition} to the registered offsets.
+   */
+  private val sspToRegisteredOffsets = new HashMap[SystemStreamPartition, String]()
+
+  /**
    * A buffer of incoming messages grouped by SystemStreamPartition. These
    * messages are handed out to the MessageChooser as it needs them.
    */
@@ -154,6 +164,11 @@ class SystemConsumers (
   metrics.setUnprocessedMessages(() => totalUnprocessedMessages)
 
   def start {
+    for ((systemStreamPartition, offset) <- sspToRegisteredOffsets.asScala) {
+      val consumer = consumers(systemStreamPartition.getSystem)
+      consumer.register(systemStreamPartition, offset)
+    }
+
     debug("Starting consumers.")
     emptySystemStreamPartitionsBySystem.asScala ++= unprocessedMessagesBySSP
       .keySet
@@ -208,7 +223,11 @@ class SystemConsumers (
       if (startpoint != null) {
         consumer.register(systemStreamPartition, startpoint)
       } else {
-        consumer.register(systemStreamPartition, offset)
+        val existingOffset = sspToRegisteredOffsets.get(systemStreamPartition)
+        val systemAdmin = systemAdmins.getSystemAdmin(systemStreamPartition.getSystem)
+        if (existingOffset == null || systemAdmin.offsetComparator(existingOffset, offset) > 0) {
+          sspToRegisteredOffsets.put(systemStreamPartition, offset)
+        }
       }
     } catch {
       case e: NoSuchElementException => throw new SystemConsumersException("can't register " + systemStreamPartition.getSystem + "'s consumer.", e)
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index 48f8619..6197690 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -41,6 +41,8 @@ import org.apache.samza.context.JobContext;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemConsumers;
 import org.apache.samza.system.SystemStreamPartition;
@@ -48,6 +50,7 @@ import org.apache.samza.system.TestSystemConsumers;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
+import org.mockito.Mockito;
 import scala.Option;
 import scala.collection.JavaConverters;
 
@@ -636,9 +639,14 @@ public class TestAsyncRunLoop {
     SystemConsumer mockConsumer = mock(SystemConsumer.class);
     when(mockConsumer.poll(anyObject(), anyLong())).thenReturn(sspMap);
 
+    SystemAdmins systemAdmins = Mockito.mock(SystemAdmins.class);
+    Mockito.when(systemAdmins.getSystemAdmin("system1")).thenReturn(Mockito.mock(SystemAdmin.class));
+    Mockito.when(systemAdmins.getSystemAdmin("testSystem")).thenReturn(Mockito.mock(SystemAdmin.class));
+
     HashMap<String, SystemConsumer> systemConsumerMap = new HashMap<>();
     systemConsumerMap.put("system1", mockConsumer);
-    SystemConsumers consumers = TestSystemConsumers.getSystemConsumers(systemConsumerMap);
+
+    SystemConsumers consumers = TestSystemConsumers.getSystemConsumers(systemConsumerMap, systemAdmins);
 
     TaskName taskName1 = new TaskName("task1");
     TaskName taskName2 = new TaskName("task2");
diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
index 3ff651b..5ab7635 100644
--- a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
+++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
@@ -41,7 +41,7 @@ object StreamProcessorTestUtils {
     val adminMultiplexer = new SystemAdmins(config)
     val consumerMultiplexer = new SystemConsumers(
       new RoundRobinChooser,
-      Map[String, SystemConsumer]())
+      Map[String, SystemConsumer](), SystemAdmins.empty())
     val producerMultiplexer = new SystemProducers(
       Map[String, SystemProducer](),
       new SerdeManager)
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
index 0732311..15e2627 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -44,7 +44,10 @@ class TestSystemConsumers {
     val envelope = new IncomingMessageEnvelope(systemStreamPartition0, "1", "k", "v")
     val consumer = new CustomPollResponseSystemConsumer(envelope)
     var now = 0L
-    val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer),
+    val systemAdmins = Mockito.mock(classOf[SystemAdmins])
+    Mockito.doReturn(Mockito.mock(classOf[SystemAdmin])).when(systemAdmins.getSystemAdmin(system))
+
+    val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer), systemAdmins,
                                         new SerdeManager, new SystemConsumersMetrics,
                                         SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
                                         SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
@@ -106,7 +109,10 @@ class TestSystemConsumers {
     val envelope = new IncomingMessageEnvelope(systemStreamPartition, "1", "k", "v")
     val consumer = new CustomPollResponseSystemConsumer(envelope)
     var now = 0
-    val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer),
+    val systemAdmins = Mockito.mock(classOf[SystemAdmins])
+    Mockito.doReturn(Mockito.mock(classOf[SystemAdmin])).when(systemAdmins.getSystemAdmin(system))
+
+    val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer), systemAdmins,
                                         new SerdeManager, new SystemConsumersMetrics,
                                         SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
                                         SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
@@ -167,13 +173,16 @@ class TestSystemConsumers {
       def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]().asJava
     })
 
+    val systemAdmins = Mockito.mock(classOf[SystemAdmins])
+    Mockito.when(systemAdmins.getSystemAdmin(system)).thenReturn(Mockito.mock(classOf[SystemAdmin]))
+
     val consumers = new SystemConsumers(new MessageChooser {
       def update(envelope: IncomingMessageEnvelope) = Unit
       def choose = null
       def start = chooserStarted += 1
       def stop = chooserStopped += 1
       def register(systemStreamPartition: SystemStreamPartition, offset: String) = chooserRegistered += systemStreamPartition -> offset
-    }, consumer, null)
+    }, consumer, systemAdmins)
 
     consumers.register(systemStreamPartition, "0", null)
     consumers.start
@@ -231,15 +240,17 @@ class TestSystemConsumers {
     val systemStreamPartition = new SystemStreamPartition(system, "some-stream", new Partition(1))
     val msgChooser = new DefaultChooser
     val consumer = Map(system -> new SerializingConsumer)
-    val systemMessageSerdes = Map(system -> (new StringSerde("UTF-8")).asInstanceOf[Serde[Object]]);
+    val systemMessageSerdes = Map(system -> (new StringSerde("UTF-8")).asInstanceOf[Serde[Object]])
     val serdeManager = new SerdeManager(systemMessageSerdes = systemMessageSerdes)
+    val systemAdmins = Mockito.mock(classOf[SystemAdmins])
+    Mockito.when(systemAdmins.getSystemAdmin(system)).thenReturn(Mockito.mock(classOf[SystemAdmin]))
 
     // throw exceptions when the deserialization has error
-    val consumers = new SystemConsumers(msgChooser, consumer, serdeManager, dropDeserializationError = false)
+    val consumers = new SystemConsumers(msgChooser, consumer, systemAdmins, serdeManager, dropDeserializationError = false)
     consumers.register(systemStreamPartition, "0", null)
-    consumer(system).putBytesMessage
-    consumer(system).putStringMessage
     consumers.start
+    consumer(system).putStringMessage
+    consumer(system).putBytesMessage
 
     var caughtRightException = false
     try {
@@ -248,16 +259,16 @@ class TestSystemConsumers {
       case e: SystemConsumersException => caughtRightException = true
       case _: Throwable => caughtRightException = false
     }
-    assertTrue("suppose to throw SystemConsumersException", caughtRightException);
+    assertTrue("suppose to throw SystemConsumersException", caughtRightException)
     consumers.stop
 
     // it should not throw exceptions when deserializaion fails if dropDeserializationError is set to true
-    val consumers2 = new SystemConsumers(msgChooser, consumer, serdeManager, dropDeserializationError = true)
+    val consumers2 = new SystemConsumers(msgChooser, consumer, systemAdmins, serdeManager, dropDeserializationError = true)
     consumers2.register(systemStreamPartition, "0", null)
+    consumers2.start
     consumer(system).putBytesMessage
     consumer(system).putStringMessage
     consumer(system).putBytesMessage
-    consumers2.start
 
     var notThrowException = true;
     try {
@@ -299,8 +310,10 @@ class TestSystemConsumers {
     val normalEnvelope = new IncomingMessageEnvelope(systemStreamPartition1, "1", "k", "v")
     val endOfStreamEnvelope = IncomingMessageEnvelope.buildEndOfStreamEnvelope(systemStreamPartition2)
     val consumer = new CustomPollResponseSystemConsumer(normalEnvelope)
+    val systemAdmins = Mockito.mock(classOf[SystemAdmins])
+    Mockito.when(systemAdmins.getSystemAdmin(system)).thenReturn(Mockito.mock(classOf[SystemAdmin]))
     val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer),
-      new SerdeManager, new SystemConsumersMetrics,
+      systemAdmins, new SerdeManager, new SystemConsumersMetrics,
       SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
       SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
       SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => 0)
@@ -350,8 +363,11 @@ class TestSystemConsumers {
 
     val consumer = Mockito.mock(classOf[SystemConsumer])
     val startpoint = Mockito.mock(classOf[Startpoint])
+    val systemAdmins = Mockito.mock(classOf[SystemAdmins])
+    Mockito.when(systemAdmins.getSystemAdmin(system)).thenReturn(Mockito.mock(classOf[SystemAdmin]))
+
     val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer),
-      new SerdeManager, new SystemConsumersMetrics,
+      systemAdmins, new SerdeManager, new SystemConsumersMetrics,
       SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
       SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
       SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => 0)
@@ -394,20 +410,28 @@ class TestSystemConsumers {
    */
   private class SerializingConsumer extends BlockingEnvelopeMap {
     val systemStreamPartition = new SystemStreamPartition("test-system", "some-stream", new Partition(1))
-    def putBytesMessage {
+    def putBytesMessage() {
       put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, "0", "0", "test".getBytes()))
     }
-    def putStringMessage {
+    def putStringMessage() {
       put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, "0", "1", "test"))
     }
-    def start {}
-    def stop {}
-    def register { super.register(systemStreamPartition, "0") }
+    def start() {}
+    def stop() {}
+
+    override def register(systemStreamPartition: SystemStreamPartition, offset: String): Unit = {
+       super[BlockingEnvelopeMap].register(systemStreamPartition, offset)
+    }
+
+    override def register(systemStreamPartition: SystemStreamPartition, startpoint: Startpoint): Unit = {
+      super[BlockingEnvelopeMap].register(systemStreamPartition, startpoint)
+    }
+
   }
 }
 
 object TestSystemConsumers {
-  def getSystemConsumers(consumers: java.util.Map[String, SystemConsumer]) : SystemConsumers = {
-    new SystemConsumers(new DefaultChooser, consumers.asScala.toMap)
+  def getSystemConsumers(consumers: java.util.Map[String, SystemConsumer], systemAdmins: SystemAdmins = SystemAdmins.empty()) : SystemConsumers = {
+    new SystemConsumers(new DefaultChooser, consumers.asScala.toMap, systemAdmins)
   }
 }