You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/02/28 03:57:24 UTC
samza git commit: SAMZA-579;
don't drop SSPs when a topic metadata refresh fails in kafka system
consumer
Repository: samza
Updated Branches:
refs/heads/master 922771ea4 -> dd18d5ae4
SAMZA-579; don't drop SSPs when a topic metadata refresh fails in kafka system consumer
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/dd18d5ae
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/dd18d5ae
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/dd18d5ae
Branch: refs/heads/master
Commit: dd18d5ae4f4f8440534cbeffdc494b9a07d1f9e8
Parents: 922771e
Author: Chris Riccomini <cr...@apache.org>
Authored: Fri Feb 27 17:18:04 2015 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Fri Feb 27 17:18:04 2015 -0800
----------------------------------------------------------------------
.../apache/samza/container/SamzaContainer.scala | 3 +-
.../apache/samza/system/kafka/BrokerProxy.scala | 4 +-
.../system/kafka/KafkaSystemConsumer.scala | 60 +++++++++++++-------
.../apache/samza/system/kafka/MessageSink.scala | 5 +-
.../samza/system/kafka/TestBrokerProxy.scala | 2 +
.../system/kafka/TestKafkaSystemConsumer.scala | 2 +-
.../samza/test/integration/join/Checker.java | 36 ++++--------
.../samza/test/integration/join/Emitter.java | 4 --
.../samza/test/integration/join/Joiner.java | 1 -
9 files changed, 64 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/dd18d5ae/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index e3b9d30..275eb1a 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -78,8 +78,9 @@ object SamzaContainer extends Logging {
// validate that we don't leak JMX non-daemon threads if we have an
// exception in the main method.
val containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID).toInt
+ logger.info("Got container ID: %s" format containerId)
val coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL)
- logger.info("######### Coordinator URL in SafeMain() - " + coordinatorUrl)
+ logger.info("Got coordinator URL: %s" format coordinatorUrl)
val jobModel = readJobModel(coordinatorUrl)
val containerModel = jobModel.getContainers()(containerId.toInt)
val config = jobModel.getConfig
http://git-wip-us.apache.org/repos/asf/samza/blob/dd18d5ae/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index cc0a4c6..f768263 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -137,6 +137,7 @@ class BrokerProxy(
}
while (!Thread.currentThread.isInterrupted) {
+ messageSink.refreshDropped
if (nextOffsets.size == 0) {
debug("No TopicPartitions to fetch. Sleeping.")
Thread.sleep(sleepMSWhileNoTopicPartitions)
@@ -271,9 +272,8 @@ class BrokerProxy(
override def toString() = "BrokerProxy for %s:%d" format (host, port)
def start {
- info("Starting " + toString)
-
if (!thread.isAlive) {
+ info("Starting " + toString)
thread.setDaemon(true)
thread.setName(SAMZA_THREAD_NAME_PREFIX + BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX + thread.getName)
thread.start
http://git-wip-us.apache.org/repos/asf/samza/blob/dd18d5ae/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 4918e3e..38117e2 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -38,6 +38,8 @@ import org.apache.samza.util.TopicMetadataStore
import org.apache.samza.util.ExponentialSleepStrategy
import kafka.api.TopicMetadata
import org.apache.samza.util.ExponentialSleepStrategy
+import java.util.concurrent.ConcurrentHashMap
+import scala.collection.JavaConversions._
object KafkaSystemConsumer {
def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
@@ -86,27 +88,21 @@ private[kafka] class KafkaSystemConsumer(
type HostPort = (String, Int)
val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()
- var nextOffsets = Map[SystemStreamPartition, String]()
+ val topicPartitionsAndOffsets: scala.collection.concurrent.Map[TopicAndPartition, String] = new ConcurrentHashMap[TopicAndPartition, String]()
var perPartitionFetchThreshold = fetchThreshold
def start() {
- if (nextOffsets.size > 0) {
- perPartitionFetchThreshold = fetchThreshold / nextOffsets.size
+ if (topicPartitionsAndOffsets.size > 0) {
+ perPartitionFetchThreshold = fetchThreshold / topicPartitionsAndOffsets.size
}
- val topicPartitionsAndOffsets = nextOffsets.map {
- case (systemStreamPartition, offset) =>
- val topicAndPartition = KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition)
- (topicAndPartition, offset)
- }
-
- refreshBrokers(topicPartitionsAndOffsets)
+ refreshBrokers
}
override def register(systemStreamPartition: SystemStreamPartition, offset: String) {
super.register(systemStreamPartition, offset)
- nextOffsets += systemStreamPartition -> offset
+ topicPartitionsAndOffsets += KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition) -> offset
metrics.registerTopicAndPartition(KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition))
}
@@ -134,8 +130,9 @@ private[kafka] class KafkaSystemConsumer(
}
}
- def refreshBrokers(topicPartitionsAndOffsets: Map[TopicAndPartition, String]) {
+ def refreshBrokers {
var tpToRefresh = topicPartitionsAndOffsets.keySet.toList
+ info("Refreshing brokers for: %s" format topicPartitionsAndOffsets)
retryBackoff.run(
loop => {
val topics = tpToRefresh.map(_.topic).toSet
@@ -146,12 +143,27 @@ private[kafka] class KafkaSystemConsumer(
def refresh(tp: List[TopicAndPartition]) = {
val head :: rest = tpToRefresh
val nextOffset = topicPartitionsAndOffsets.get(head).get
- getHostPort(topicMetadata(head.topic), head.partition) match {
- case Some((host, port)) =>
- val brokerProxy = brokerProxies.getOrElseUpdate((host, port), createBrokerProxy(host, port))
- brokerProxy.addTopicPartition(head, Option(nextOffset))
- brokerProxy.start
- case None => warn("No such topic-partition: %s, dropping." format head)
+ // refreshBrokers can be called from abdicate and refreshDropped,
+ // both of which are triggered from BrokerProxy threads. To prevent
+ // accidentally creating multiple objects for the same broker, or
+ // accidentally not updating the topicPartitionsAndOffsets variable,
+ // we need to lock.
+ this.synchronized {
+ // Check if we still need this TopicAndPartition inside the
+ // critical section. If we don't, then skip it.
+ if (topicPartitionsAndOffsets.contains(head)) {
+ getHostPort(topicMetadata(head.topic), head.partition) match {
+ case Some((host, port)) =>
+ val brokerProxy = brokerProxies.getOrElseUpdate((host, port), createBrokerProxy(host, port))
+ brokerProxy.addTopicPartition(head, Option(nextOffset))
+ brokerProxy.start
+ debug("Claimed topic-partition (%s) for (%s)".format(head, brokerProxy))
+ topicPartitionsAndOffsets -= head
+ case None => info("No metadata available for: %s. Will try to refresh and add to a consumer thread later." format head)
+ }
+ } else {
+ debug("Ignoring refresh for %s because we already added it from another thread." format head)
+ }
}
rest
}
@@ -170,6 +182,15 @@ private[kafka] class KafkaSystemConsumer(
}
val sink = new MessageSink {
+ var lastDroppedRefresh = 0L
+
+ def refreshDropped() {
+ if (topicPartitionsAndOffsets.size > 0 && clock() - lastDroppedRefresh > 10000) {
+ refreshBrokers
+ lastDroppedRefresh = clock()
+ }
+ }
+
def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
setIsAtHead(toSystemStreamPartition(tp), isAtHighWatermark)
}
@@ -202,7 +223,8 @@ private[kafka] class KafkaSystemConsumer(
def abdicate(tp: TopicAndPartition, nextOffset: Long) {
info("Abdicating for %s" format (tp))
- refreshBrokers(Map(tp -> nextOffset.toString))
+ topicPartitionsAndOffsets += tp -> nextOffset.toString
+ refreshBrokers
}
private def toSystemStreamPartition(tp: TopicAndPartition) = {
http://git-wip-us.apache.org/repos/asf/samza/blob/dd18d5ae/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
index 48ad66e..50d4746 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.samza.system.kafka
import kafka.common.TopicAndPartition
@@ -27,6 +28,8 @@ private[kafka] trait MessageSink {
def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long): Unit
def abdicate(tp: TopicAndPartition, nextOffset: Long): Unit
-
+
+ def refreshDropped(): Unit
+
def needsMoreMessages(tp: TopicAndPartition): Boolean
}
http://git-wip-us.apache.org/repos/asf/samza/blob/dd18d5ae/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index 6f05f3c..d559d8b 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -51,6 +51,8 @@ class TestBrokerProxy extends Logging {
val receivedMessages = new scala.collection.mutable.ListBuffer[(TopicAndPartition, MessageAndOffset, Boolean)]()
def abdicate(tp: TopicAndPartition, nextOffset: Long) {}
+ def refreshDropped() {}
+
def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) { receivedMessages.add((tp, msg, msg.offset.equals(highWatermark))) }
def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
http://git-wip-us.apache.org/repos/asf/samza/blob/dd18d5ae/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
index 2c0f803..2a84328 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
@@ -35,7 +35,7 @@ class TestKafkaSystemConsumer {
def testFetchThresholdShouldDivideEvenlyAmongPartitions {
val metadataStore = new MockMetadataStore
val consumer = new KafkaSystemConsumer("", new KafkaSystemConsumerMetrics, metadataStore, fetchThreshold = 50000) {
- override def refreshBrokers(topicPartitionsAndOffsets: Map[TopicAndPartition, String]) {
+ override def refreshBrokers {
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/dd18d5ae/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
index 1fef1ea..0598e51 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
@@ -35,11 +35,6 @@ import org.apache.samza.task.WindowableTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
public class Checker implements StreamTask, WindowableTask, InitableTask {
private static Logger logger = LoggerFactory.getLogger(Checker.class);
@@ -98,26 +93,19 @@ public class Checker implements StreamTask, WindowableTask, InitableTask {
}
}
-/* private void checkEpoch(String epoch) {
+ private void checkEpoch(String epoch) {
String curr = this.store.get(CURRENT_EPOCH);
- if(curr == null)
+ if (curr == null)
this.store.put(CURRENT_EPOCH, epoch);
- else if(!curr.equals(epoch)) // should have curr > epoch
- throw new IllegalArgumentException("Got epoch " + epoch + " but have not yet completed " + curr);
- }*/
- private void checkEpoch(String epoch) {
- String curr = this.store.get(CURRENT_EPOCH);
- if(curr == null)
- this.store.put(CURRENT_EPOCH, epoch);
- else {
- int currentEpochInStore = Integer.parseInt(curr);
- int currentEpochInMsg = Integer.parseInt(epoch);
- if (currentEpochInMsg <= currentEpochInStore) {
- if(currentEpochInMsg < currentEpochInStore)
- logger.info("#### Ignoring received epoch = " + epoch + " less than what is in store " + curr);
- } else { // should have curr > epoch
- throw new IllegalArgumentException("Got epoch " + epoch + " but have not yet completed " + curr);
- }
- }
+ else {
+ int currentEpochInStore = Integer.parseInt(curr);
+ int currentEpochInMsg = Integer.parseInt(epoch);
+ if (currentEpochInMsg <= currentEpochInStore) {
+ if (currentEpochInMsg < currentEpochInStore)
+ logger.info("#### Ignoring received epoch = " + epoch + " less than what is in store " + curr);
+ } else { // should have curr > epoch
+ throw new IllegalArgumentException("Got epoch " + epoch + " but have not yet completed " + curr);
+ }
}
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/dd18d5ae/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
index e958b51..82a633d 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
@@ -88,10 +88,6 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask {
collector.send(envelope);
this.state.put(COUNT, Integer.toString(getInt(COUNT) + 1));
}
-/* if(counter == max) {
- logger.info("###### Committing because we finished emitting counter in this epoch");
- coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER);
- }*/
}
private void resetEpoch() {
http://git-wip-us.apache.org/repos/asf/samza/blob/dd18d5ae/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java
index cb30838..d2c0c7e 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java
@@ -80,7 +80,6 @@ public class Joiner implements StreamTask, InitableTask {
if(partitions.partitions.size() == expected) {
logger.info("Completed: " + key + " -> " + Integer.toString(epoch));
collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "completed-keys"), key, Integer.toString(epoch)));
-// logger.info("Completed key " + key + " for epoch " + epoch);
}
}
this.store.put(key, partitions.toString());