You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/08/24 10:39:15 UTC
samza git commit: SAMZA-736: Fixed infinite loop in BrokerProxy when
OOME/Stackoverflow occurs
Repository: samza
Updated Branches:
refs/heads/master 3a4886a65 -> 79ec5dbfc
SAMZA-736: Fixed infinite loop in BrokerProxy when OOME/Stackoverflow occurs
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/79ec5dbf
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/79ec5dbf
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/79ec5dbf
Branch: refs/heads/master
Commit: 79ec5dbfcf80e9c346baf585dba20e7bd3098ff1
Parents: 3a4886a
Author: Aleksandar Pejakovic <a....@levi9.com>
Authored: Mon Aug 24 01:38:59 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Committed: Mon Aug 24 01:38:59 2015 -0700
----------------------------------------------------------------------
.../samza/util/ExponentialSleepStrategy.scala | 2 +
.../apache/samza/system/kafka/BrokerProxy.scala | 7 +-
.../samza/system/kafka/TestBrokerProxy.scala | 189 ++++++++++++-------
3 files changed, 131 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/79ec5dbf/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala b/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
index 376b277..4a04c13 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
@@ -84,6 +84,8 @@ class ExponentialSleepStrategy(
} catch {
case e: InterruptedException => throw e
case e: ClosedByInterruptException => throw e
+ case e: OutOfMemoryError => throw e
+ case e: StackOverflowError => throw e
case e: Exception => onException(e, loop)
}
if (!loop.isDone && !Thread.currentThread.isInterrupted) loop.sleep
http://git-wip-us.apache.org/repos/asf/samza/blob/79ec5dbf/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 614f33f..c8cbc38 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -28,6 +28,7 @@ import kafka.api._
import kafka.common.{NotLeaderForPartitionException, UnknownTopicOrPartitionException, ErrorMapping, TopicAndPartition}
import kafka.consumer.ConsumerConfig
import kafka.message.MessageSet
+import org.apache.samza.SamzaException
import org.apache.samza.util.ExponentialSleepStrategy
import org.apache.samza.util.Logging
import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX
@@ -84,7 +85,7 @@ class BrokerProxy(
val hostString = "%s:%d" format (host, port)
info("Creating new SimpleConsumer for host %s for system %s" format (hostString, system))
- val sc = new DefaultFetchSimpleConsumer(host, port, timeout, bufferSize, clientID, fetchSize, consumerMinSize, consumerMaxWait)
+ val sc = new DefaultFetchSimpleConsumer(host, port, timeout, bufferSize, clientID, fetchSize, consumerMinSize, consumerMaxWait)
sc
}
@@ -160,8 +161,10 @@ class BrokerProxy(
reconnect = true
})
} catch {
- case e: InterruptedException => info("Got interrupt exception in broker proxy thread.")
+ case e: InterruptedException => info("Got interrupt exception in broker proxy thread.")
case e: ClosedByInterruptException => info("Got closed by interrupt exception in broker proxy thread.")
+ case e: OutOfMemoryError => throw new SamzaException("Got out of memory error in broker proxy thread.")
+ case e: StackOverflowError => throw new SamzaException("Got stack overflow error in broker proxy thread.")
}
if (Thread.currentThread.isInterrupted) info("Shutting down due to interrupt.")
http://git-wip-us.apache.org/repos/asf/samza/blob/79ec5dbf/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index e285dec..170318e 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -21,25 +21,21 @@
package org.apache.samza.system.kafka
import java.nio.ByteBuffer
-import java.nio.channels.ClosedChannelException
import java.util.concurrent.CountDownLatch
-import kafka.api._
-import kafka.api.PartitionOffsetsResponse
-import kafka.common.ErrorMapping
-import kafka.common.TopicAndPartition
+import kafka.api.{PartitionOffsetsResponse, _}
+import kafka.common.{ErrorMapping, TopicAndPartition}
import kafka.consumer.SimpleConsumer
-import kafka.message.{MessageSet, Message, MessageAndOffset, ByteBufferMessageSet}
-
+import kafka.message.{ByteBufferMessageSet, Message, MessageAndOffset, MessageSet}
import org.apache.samza.SamzaException
import org.apache.samza.util.Logging
-import org.junit._
import org.junit.Assert._
-import org.mockito.{Matchers, Mockito}
-import org.mockito.Mockito._
+import org.junit._
import org.mockito.Matchers._
-import org.mockito.stubbing.Answer
+import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.mockito.{Matchers, Mockito}
import scala.collection.JavaConversions._
@@ -47,14 +43,56 @@ class TestBrokerProxy extends Logging {
val tp2 = new TopicAndPartition("Redbird", 2013)
var fetchTp1 = true // control whether fetching tp1 messages or not
+ @Test def brokerProxyRetrievesMessagesCorrectly() = {
+ val (bp, tp, sink) = getMockBrokerProxy()
+
+ bp.start
+ bp.addTopicPartition(tp, Option("0"))
+ // Add tp2, which should never receive messages since sink disables it.
+ bp.addTopicPartition(tp2, Option("0"))
+ Thread.sleep(1000)
+ assertEquals(2, sink.receivedMessages.size)
+ assertEquals(42, sink.receivedMessages.get(0)._2.offset)
+ assertEquals(84, sink.receivedMessages.get(1)._2.offset)
+ }
+
+ @Test def brokerProxySkipsFetchForEmptyRequests() = {
+ val (bp, tp, sink) = getMockBrokerProxy()
+
+ bp.start
+ // Only add tp2, which should never receive messages since sink disables it.
+ bp.addTopicPartition(tp2, Option("0"))
+ Thread.sleep(1000)
+ assertEquals(0, sink.receivedMessages.size)
+ assertTrue(bp.metrics.brokerSkippedFetchRequests(bp.host, bp.port).getCount > 0)
+ assertEquals(0, bp.metrics.brokerReads(bp.host, bp.port).getCount)
+ }
+
+ @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = {
+ val (bp, tp, _) = getMockBrokerProxy()
+ bp.start
+ bp.addTopicPartition(tp, Option("0"))
+
+ try {
+ bp.addTopicPartition(tp, Option("1"))
+ fail("Should have thrown an exception")
+ } catch {
+ case se: SamzaException => assertEquals(se.getMessage, "Already consuming TopicPartition [Redbird,2012]")
+ case other: Exception => fail("Got some other exception than what we were expecting: " + other)
+ }
+ }
+
def getMockBrokerProxy() = {
val sink = new MessageSink {
val receivedMessages = new scala.collection.mutable.ListBuffer[(TopicAndPartition, MessageAndOffset, Boolean)]()
+
def abdicate(tp: TopicAndPartition, nextOffset: Long) {}
def refreshDropped() {}
- def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) { receivedMessages.add((tp, msg, msg.offset.equals(highWatermark))) }
+ def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) {
+ receivedMessages.add((tp, msg, msg.offset.equals(highWatermark)))
+ }
def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
}
@@ -82,8 +120,10 @@ class TestBrokerProxy extends Logging {
sink,
offsetGetter = new GetOffset("fail", Map("Redbird" -> "largest"))) {
- override val sleepMSWhileNoTopicPartitions = 100 // Speed up for test
+ override val sleepMSWhileNoTopicPartitions = 100
+ // Speed up for test
var alreadyCreatedConsumer = false
+
// Scala traits and Mockito mocks don't mix, unfortunately.
override def createSimpleConsumer() = {
if (alreadyCreatedConsumer) {
@@ -139,8 +179,8 @@ class TestBrokerProxy extends Logging {
override def send(request: TopicMetadataRequest): TopicMetadataResponse = sc.send(request)
override def fetch(request: FetchRequest): FetchResponse = {
- // Verify that we only get fetch requests for one tp, even though
- // two were registered. This is to verify that
+ // Verify that we only get fetch requests for one tp, even though
+ // two were registered. This is to verify that
// sink.needsMoreMessages works.
assertEquals(1, request.requestInfo.size)
sc.fetch(request)
@@ -163,45 +203,6 @@ class TestBrokerProxy extends Logging {
(bp, tp, sink)
}
- @Test def brokerProxyRetrievesMessagesCorrectly() = {
- val (bp, tp, sink) = getMockBrokerProxy()
-
- bp.start
- bp.addTopicPartition(tp, Option("0"))
- // Add tp2, which should never receive messages since sink disables it.
- bp.addTopicPartition(tp2, Option("0"))
- Thread.sleep(1000)
- assertEquals(2, sink.receivedMessages.size)
- assertEquals(42, sink.receivedMessages.get(0)._2.offset)
- assertEquals(84, sink.receivedMessages.get(1)._2.offset)
- }
-
- @Test def brokerProxySkipsFetchForEmptyRequests() = {
- val (bp, tp, sink) = getMockBrokerProxy()
-
- bp.start
- // Only add tp2, which should never receive messages since sink disables it.
- bp.addTopicPartition(tp2, Option("0"))
- Thread.sleep(1000)
- assertEquals(0, sink.receivedMessages.size)
- assertTrue(bp.metrics.brokerSkippedFetchRequests(bp.host, bp.port).getCount > 0)
- assertEquals(0, bp.metrics.brokerReads(bp.host, bp.port).getCount)
- }
-
- @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = {
- val (bp, tp, _) = getMockBrokerProxy()
- bp.start
- bp.addTopicPartition(tp, Option("0"))
-
- try {
- bp.addTopicPartition(tp, Option("1"))
- fail("Should have thrown an exception")
- } catch {
- case se: SamzaException => assertEquals(se.getMessage, "Already consuming TopicPartition [Redbird,2012]")
- case other: Exception => fail("Got some other exception than what we were expecting: " + other)
- }
- }
-
@Test def brokerProxyUpdateLatencyMetrics() = {
val (bp, tp, _) = getMockBrokerProxy()
@@ -221,10 +222,10 @@ class TestBrokerProxy extends Logging {
fetchTp1 = true
}
- @Test def brokerProxyCorrectlyHandlesOffsetOutOfRange():Unit = {
+ @Test def brokerProxyCorrectlyHandlesOffsetOutOfRange(): Unit = {
// Need to wait for the thread to do some work before ending the test
val countdownLatch = new CountDownLatch(1)
- var failString:String = null
+ var failString: String = null
val mockMessageSink = mock(classOf[MessageSink])
when(mockMessageSink.needsMoreMessages(any())).thenReturn(true)
@@ -243,13 +244,14 @@ class TestBrokerProxy extends Logging {
// Create an answer that first indicates offset out of range on first invocation and on second
// verifies that the parameters have been updated to what we expect them to be
- val answer = new Answer[FetchResponse](){
+ val answer = new Answer[FetchResponse]() {
var invocationCount = 0
+
def answer(invocation: InvocationOnMock): FetchResponse = {
val arguments = invocation.getArguments()(0).asInstanceOf[List[Object]](0).asInstanceOf[(String, Long)]
- if(invocationCount == 0) {
- if(arguments != (tp, 0)) {
+ if (invocationCount == 0) {
+ if (arguments !=(tp, 0)) {
failString = "First invocation did not have the right arguments: " + arguments
countdownLatch.countDown()
}
@@ -266,7 +268,7 @@ class TestBrokerProxy extends Logging {
invocationCount += 1
mfr
} else {
- if(arguments != (tp, 1492)) {
+ if (arguments !=(tp, 1492)) {
failString = "On second invocation, arguments were not correct: " + arguments
}
countdownLatch.countDown()
@@ -275,7 +277,7 @@ class TestBrokerProxy extends Logging {
}
}
}
-
+
when(mockSimpleConsumer.defaultFetch(any())).thenAnswer(answer)
// So now we have a fetch response that will fail. Prime the mockGetOffset to send us to a new offset
@@ -283,7 +285,7 @@ class TestBrokerProxy extends Logging {
val bp = new BrokerProxy("host", 423, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
override def createSimpleConsumer() = {
- if(callsToCreateSimpleConsumer > 1) {
+ if (callsToCreateSimpleConsumer > 1) {
failString = "Tried to create more than one simple consumer"
countdownLatch.countDown()
}
@@ -296,31 +298,35 @@ class TestBrokerProxy extends Logging {
bp.start
countdownLatch.await()
bp.stop
- if(failString != null) {
+ if (failString != null) {
fail(failString)
}
}
/**
- * Test that makes sure that BrokerProxy abdicates all TopicAndPartitions
+ * Test that makes sure that BrokerProxy abdicates all TopicAndPartitions
* that it owns when a consumer failure occurs.
*/
- @Test def brokerProxyAbdicatesOnConnectionFailure():Unit = {
+ @Test def brokerProxyAbdicatesOnConnectionFailure(): Unit = {
val countdownLatch = new CountDownLatch(1)
var abdicated: Option[TopicAndPartition] = None
@volatile var refreshDroppedCount = 0
val mockMessageSink = new MessageSink {
override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
}
+
override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) {
}
+
override def abdicate(tp: TopicAndPartition, nextOffset: Long) {
abdicated = Some(tp)
countdownLatch.countDown
}
+
override def refreshDropped() {
refreshDroppedCount += 1
}
+
override def needsMoreMessages(tp: TopicAndPartition): Boolean = {
true
}
@@ -358,4 +364,57 @@ class TestBrokerProxy extends Logging {
bp.stop
assertEquals(tp, abdicated.getOrElse(null))
}
+
+ @Test def brokerProxyAbdicatesHardErrors(): Unit = {
+ val doNothingMetrics = new KafkaSystemConsumerMetrics
+ val mockMessageSink = new MessageSink {
+ override def needsMoreMessages(tp: TopicAndPartition): Boolean = true
+ override def abdicate(tp: TopicAndPartition, nextOffset: Long) {}
+ override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) {}
+ override def refreshDropped() {throw new OutOfMemoryError("Test - OOME")}
+ override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean): Unit = {}
+ }
+ val mockOffsetGetter = mock(classOf[GetOffset])
+ val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
+
+ val bp = new BrokerProxy("host", 658, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
+ override def createSimpleConsumer() = {
+ mockSimpleConsumer
+ }
+ }
+ var caughtError = false
+ try {
+ bp.thread.run
+ } catch {
+ case e: SamzaException => {
+ assertEquals(e.getMessage, "Got out of memory error in broker proxy thread.")
+ info("Received OutOfMemoryError in broker proxy.")
+ caughtError = true
+ }
+ }
+ assertEquals(true, caughtError)
+ val mockMessageSink2 = new MessageSink {
+ override def needsMoreMessages(tp: TopicAndPartition): Boolean = true
+ override def abdicate(tp: TopicAndPartition, nextOffset: Long): Unit = {}
+ override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long): Unit = {}
+ override def refreshDropped(): Unit = {throw new StackOverflowError("Test - SOE")}
+ override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean): Unit = {}
+ }
+ caughtError = false
+ val bp2 = new BrokerProxy("host", 689, "system", "clientID2", doNothingMetrics, mockMessageSink2, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
+ override def createSimpleConsumer() = {
+ mockSimpleConsumer
+ }
+ }
+ try {
+ bp2.thread.run
+ } catch {
+ case e: SamzaException => {
+ assertEquals(e.getMessage, "Got stack overflow error in broker proxy thread.")
+ info("Received StackOverflowError in broker proxy.")
+ caughtError = true
+ }
+ }
+ assertEquals(true, caughtError)
+ }
}