You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/08/24 10:39:15 UTC

samza git commit: SAMZA-736: Fixed infinite loop in BrokerProxy when OOME/Stackoverflow occurs

Repository: samza
Updated Branches:
  refs/heads/master 3a4886a65 -> 79ec5dbfc


SAMZA-736: Fixed infinite loop in BrokerProxy when OOME/Stackoverflow occurs


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/79ec5dbf
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/79ec5dbf
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/79ec5dbf

Branch: refs/heads/master
Commit: 79ec5dbfcf80e9c346baf585dba20e7bd3098ff1
Parents: 3a4886a
Author: Aleksandar Pejakovic <a....@levi9.com>
Authored: Mon Aug 24 01:38:59 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Committed: Mon Aug 24 01:38:59 2015 -0700

----------------------------------------------------------------------
 .../samza/util/ExponentialSleepStrategy.scala   |   2 +
 .../apache/samza/system/kafka/BrokerProxy.scala |   7 +-
 .../samza/system/kafka/TestBrokerProxy.scala    | 189 ++++++++++++-------
 3 files changed, 131 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/79ec5dbf/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala b/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
index 376b277..4a04c13 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
@@ -84,6 +84,8 @@ class ExponentialSleepStrategy(
       } catch {
         case e: InterruptedException       => throw e
         case e: ClosedByInterruptException => throw e
+        case e: OutOfMemoryError           => throw e
+        case e: StackOverflowError         => throw e
         case e: Exception                  => onException(e, loop)
       }
       if (!loop.isDone && !Thread.currentThread.isInterrupted) loop.sleep

http://git-wip-us.apache.org/repos/asf/samza/blob/79ec5dbf/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 614f33f..c8cbc38 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -28,6 +28,7 @@ import kafka.api._
 import kafka.common.{NotLeaderForPartitionException, UnknownTopicOrPartitionException, ErrorMapping, TopicAndPartition}
 import kafka.consumer.ConsumerConfig
 import kafka.message.MessageSet
+import org.apache.samza.SamzaException
 import org.apache.samza.util.ExponentialSleepStrategy
 import org.apache.samza.util.Logging
 import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX
@@ -84,7 +85,7 @@ class BrokerProxy(
     val hostString = "%s:%d" format (host, port)
     info("Creating new SimpleConsumer for host %s for system %s" format (hostString, system))
 
-    val sc = new DefaultFetchSimpleConsumer(host, port, timeout, bufferSize, clientID, fetchSize, consumerMinSize, consumerMaxWait) 
+    val sc = new DefaultFetchSimpleConsumer(host, port, timeout, bufferSize, clientID, fetchSize, consumerMinSize, consumerMaxWait)
     sc
   }
 
@@ -160,8 +161,10 @@ class BrokerProxy(
             reconnect = true
           })
       } catch {
-        case e: InterruptedException => info("Got interrupt exception in broker proxy thread.")
+        case e: InterruptedException       => info("Got interrupt exception in broker proxy thread.")
         case e: ClosedByInterruptException => info("Got closed by interrupt exception in broker proxy thread.")
+        case e: OutOfMemoryError           => throw new SamzaException("Got out of memory error in broker proxy thread.")
+        case e: StackOverflowError         => throw new SamzaException("Got stack overflow error in broker proxy thread.")
       }
 
       if (Thread.currentThread.isInterrupted) info("Shutting down due to interrupt.")

http://git-wip-us.apache.org/repos/asf/samza/blob/79ec5dbf/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index e285dec..170318e 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -21,25 +21,21 @@
 package org.apache.samza.system.kafka
 
 import java.nio.ByteBuffer
-import java.nio.channels.ClosedChannelException
 import java.util.concurrent.CountDownLatch
 
-import kafka.api._
-import kafka.api.PartitionOffsetsResponse
-import kafka.common.ErrorMapping
-import kafka.common.TopicAndPartition
+import kafka.api.{PartitionOffsetsResponse, _}
+import kafka.common.{ErrorMapping, TopicAndPartition}
 import kafka.consumer.SimpleConsumer
-import kafka.message.{MessageSet, Message, MessageAndOffset, ByteBufferMessageSet}
-
+import kafka.message.{ByteBufferMessageSet, Message, MessageAndOffset, MessageSet}
 import org.apache.samza.SamzaException
 import org.apache.samza.util.Logging
-import org.junit._
 import org.junit.Assert._
-import org.mockito.{Matchers, Mockito}
-import org.mockito.Mockito._
+import org.junit._
 import org.mockito.Matchers._
-import org.mockito.stubbing.Answer
+import org.mockito.Mockito._
 import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.mockito.{Matchers, Mockito}
 
 import scala.collection.JavaConversions._
 
@@ -47,14 +43,56 @@ class TestBrokerProxy extends Logging {
   val tp2 = new TopicAndPartition("Redbird", 2013)
   var fetchTp1 = true // control whether fetching tp1 messages or not
 
+  @Test def brokerProxyRetrievesMessagesCorrectly() = {
+    val (bp, tp, sink) = getMockBrokerProxy()
+
+    bp.start
+    bp.addTopicPartition(tp, Option("0"))
+    // Add tp2, which should never receive messages since sink disables it.
+    bp.addTopicPartition(tp2, Option("0"))
+    Thread.sleep(1000)
+    assertEquals(2, sink.receivedMessages.size)
+    assertEquals(42, sink.receivedMessages.get(0)._2.offset)
+    assertEquals(84, sink.receivedMessages.get(1)._2.offset)
+  }
+
+  @Test def brokerProxySkipsFetchForEmptyRequests() = {
+    val (bp, tp, sink) = getMockBrokerProxy()
+
+    bp.start
+    // Only add tp2, which should never receive messages since sink disables it.
+    bp.addTopicPartition(tp2, Option("0"))
+    Thread.sleep(1000)
+    assertEquals(0, sink.receivedMessages.size)
+    assertTrue(bp.metrics.brokerSkippedFetchRequests(bp.host, bp.port).getCount > 0)
+    assertEquals(0, bp.metrics.brokerReads(bp.host, bp.port).getCount)
+  }
+
+  @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = {
+    val (bp, tp, _) = getMockBrokerProxy()
+    bp.start
+    bp.addTopicPartition(tp, Option("0"))
+
+    try {
+      bp.addTopicPartition(tp, Option("1"))
+      fail("Should have thrown an exception")
+    } catch {
+      case se: SamzaException => assertEquals(se.getMessage, "Already consuming TopicPartition [Redbird,2012]")
+      case other: Exception => fail("Got some other exception than what we were expecting: " + other)
+    }
+  }
+
   def getMockBrokerProxy() = {
     val sink = new MessageSink {
       val receivedMessages = new scala.collection.mutable.ListBuffer[(TopicAndPartition, MessageAndOffset, Boolean)]()
+
       def abdicate(tp: TopicAndPartition, nextOffset: Long) {}
 
       def refreshDropped() {}
 
-      def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) { receivedMessages.add((tp, msg, msg.offset.equals(highWatermark))) }
+      def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) {
+        receivedMessages.add((tp, msg, msg.offset.equals(highWatermark)))
+      }
 
       def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
       }
@@ -82,8 +120,10 @@ class TestBrokerProxy extends Logging {
       sink,
       offsetGetter = new GetOffset("fail", Map("Redbird" -> "largest"))) {
 
-      override val sleepMSWhileNoTopicPartitions = 100 // Speed up for test
+      override val sleepMSWhileNoTopicPartitions = 100
+      // Speed up for test
       var alreadyCreatedConsumer = false
+
       // Scala traits and Mockito mocks don't mix, unfortunately.
       override def createSimpleConsumer() = {
         if (alreadyCreatedConsumer) {
@@ -139,8 +179,8 @@ class TestBrokerProxy extends Logging {
           override def send(request: TopicMetadataRequest): TopicMetadataResponse = sc.send(request)
 
           override def fetch(request: FetchRequest): FetchResponse = {
-            // Verify that we only get fetch requests for one tp, even though 
-            // two were registered. This is to verify that 
+            // Verify that we only get fetch requests for one tp, even though
+            // two were registered. This is to verify that
             // sink.needsMoreMessages works.
             assertEquals(1, request.requestInfo.size)
             sc.fetch(request)
@@ -163,45 +203,6 @@ class TestBrokerProxy extends Logging {
     (bp, tp, sink)
   }
 
-  @Test def brokerProxyRetrievesMessagesCorrectly() = {
-    val (bp, tp, sink) = getMockBrokerProxy()
-
-    bp.start
-    bp.addTopicPartition(tp, Option("0"))
-    // Add tp2, which should never receive messages since sink disables it.
-    bp.addTopicPartition(tp2, Option("0"))
-    Thread.sleep(1000)
-    assertEquals(2, sink.receivedMessages.size)
-    assertEquals(42, sink.receivedMessages.get(0)._2.offset)
-    assertEquals(84, sink.receivedMessages.get(1)._2.offset)
-  }
-
-  @Test def brokerProxySkipsFetchForEmptyRequests() = {
-    val (bp, tp, sink) = getMockBrokerProxy()
-
-    bp.start
-    // Only add tp2, which should never receive messages since sink disables it.
-    bp.addTopicPartition(tp2, Option("0"))
-    Thread.sleep(1000)
-    assertEquals(0, sink.receivedMessages.size)
-    assertTrue(bp.metrics.brokerSkippedFetchRequests(bp.host, bp.port).getCount > 0)
-    assertEquals(0, bp.metrics.brokerReads(bp.host, bp.port).getCount)
-  }
-
-  @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = {
-    val (bp, tp, _) = getMockBrokerProxy()
-    bp.start
-    bp.addTopicPartition(tp, Option("0"))
-
-    try {
-      bp.addTopicPartition(tp, Option("1"))
-      fail("Should have thrown an exception")
-    } catch {
-      case se: SamzaException => assertEquals(se.getMessage, "Already consuming TopicPartition [Redbird,2012]")
-      case other: Exception => fail("Got some other exception than what we were expecting: " + other)
-    }
-  }
-
   @Test def brokerProxyUpdateLatencyMetrics() = {
     val (bp, tp, _) = getMockBrokerProxy()
 
@@ -221,10 +222,10 @@ class TestBrokerProxy extends Logging {
     fetchTp1 = true
   }
 
-  @Test def brokerProxyCorrectlyHandlesOffsetOutOfRange():Unit = {
+ @Test def brokerProxyCorrectlyHandlesOffsetOutOfRange(): Unit = {
     // Need to wait for the thread to do some work before ending the test
     val countdownLatch = new CountDownLatch(1)
-    var failString:String = null
+    var failString: String = null
 
     val mockMessageSink = mock(classOf[MessageSink])
     when(mockMessageSink.needsMoreMessages(any())).thenReturn(true)
@@ -243,13 +244,14 @@ class TestBrokerProxy extends Logging {
 
     // Create an answer that first indicates offset out of range on first invocation and on second
     // verifies that the parameters have been updated to what we expect them to be
-    val answer = new Answer[FetchResponse](){
+    val answer = new Answer[FetchResponse]() {
       var invocationCount = 0
+
       def answer(invocation: InvocationOnMock): FetchResponse = {
         val arguments = invocation.getArguments()(0).asInstanceOf[List[Object]](0).asInstanceOf[(String, Long)]
 
-        if(invocationCount == 0) {
-          if(arguments != (tp, 0)) {
+        if (invocationCount == 0) {
+          if (arguments !=(tp, 0)) {
             failString = "First invocation did not have the right arguments: " + arguments
             countdownLatch.countDown()
           }
@@ -266,7 +268,7 @@ class TestBrokerProxy extends Logging {
           invocationCount += 1
           mfr
         } else {
-          if(arguments != (tp, 1492)) {
+          if (arguments !=(tp, 1492)) {
             failString = "On second invocation, arguments were not correct: " + arguments
           }
           countdownLatch.countDown()
@@ -275,7 +277,7 @@ class TestBrokerProxy extends Logging {
         }
       }
     }
-    
+
     when(mockSimpleConsumer.defaultFetch(any())).thenAnswer(answer)
 
     // So now we have a fetch response that will fail.  Prime the mockGetOffset to send us to a new offset
@@ -283,7 +285,7 @@ class TestBrokerProxy extends Logging {
     val bp = new BrokerProxy("host", 423, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
 
       override def createSimpleConsumer() = {
-        if(callsToCreateSimpleConsumer > 1) {
+        if (callsToCreateSimpleConsumer > 1) {
           failString = "Tried to create more than one simple consumer"
           countdownLatch.countDown()
         }
@@ -296,31 +298,35 @@ class TestBrokerProxy extends Logging {
     bp.start
     countdownLatch.await()
     bp.stop
-    if(failString != null) {
+    if (failString != null) {
       fail(failString)
     }
   }
 
   /**
-   * Test that makes sure that BrokerProxy abdicates all TopicAndPartitions 
+   * Test that makes sure that BrokerProxy abdicates all TopicAndPartitions
    * that it owns when a consumer failure occurs.
    */
-  @Test def brokerProxyAbdicatesOnConnectionFailure():Unit = {
+  @Test def brokerProxyAbdicatesOnConnectionFailure(): Unit = {
     val countdownLatch = new CountDownLatch(1)
     var abdicated: Option[TopicAndPartition] = None
     @volatile var refreshDroppedCount = 0
     val mockMessageSink = new MessageSink {
       override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
       }
+
       override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) {
       }
+
       override def abdicate(tp: TopicAndPartition, nextOffset: Long) {
         abdicated = Some(tp)
         countdownLatch.countDown
       }
+
       override def refreshDropped() {
         refreshDroppedCount += 1
       }
+
       override def needsMoreMessages(tp: TopicAndPartition): Boolean = {
         true
       }
@@ -358,4 +364,57 @@ class TestBrokerProxy extends Logging {
     bp.stop
     assertEquals(tp, abdicated.getOrElse(null))
   }
+
+  @Test def brokerProxyAbdicatesHardErrors(): Unit = {
+    val doNothingMetrics = new KafkaSystemConsumerMetrics
+    val mockMessageSink = new MessageSink {
+      override def needsMoreMessages(tp: TopicAndPartition): Boolean = true
+      override def abdicate(tp: TopicAndPartition, nextOffset: Long) {}
+      override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) {}
+      override def refreshDropped() {throw new OutOfMemoryError("Test - OOME")}
+      override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean): Unit = {}
+    }
+    val mockOffsetGetter = mock(classOf[GetOffset])
+    val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
+
+    val bp = new BrokerProxy("host", 658, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
+      override def createSimpleConsumer() = {
+        mockSimpleConsumer
+      }
+    }
+    var caughtError = false
+    try {
+      bp.thread.run
+    } catch {
+      case e: SamzaException => {
+        assertEquals(e.getMessage, "Got out of memory error in broker proxy thread.")
+        info("Received OutOfMemoryError in broker proxy.")
+        caughtError = true
+      }
+    }
+    assertEquals(true, caughtError)
+    val mockMessageSink2 = new MessageSink {
+      override def needsMoreMessages(tp: TopicAndPartition): Boolean = true
+      override def abdicate(tp: TopicAndPartition, nextOffset: Long): Unit = {}
+      override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long): Unit = {}
+      override def refreshDropped(): Unit = {throw new StackOverflowError("Test - SOE")}
+      override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean): Unit = {}
+    }
+    caughtError = false
+    val bp2 = new BrokerProxy("host", 689, "system", "clientID2", doNothingMetrics, mockMessageSink2, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
+      override def createSimpleConsumer() = {
+        mockSimpleConsumer
+      }
+    }
+    try {
+      bp2.thread.run
+    } catch {
+      case e: SamzaException => {
+        assertEquals(e.getMessage, "Got stack overflow error in broker proxy thread.")
+        info("Received StackOverflowError in broker proxy.")
+        caughtError = true
+      }
+    }
+    assertEquals(true, caughtError)
+  }
 }