You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/02/28 03:57:24 UTC

samza git commit: SAMZA-579; don't drop SSPs when a topic metadata refresh fails in kafka system consumer

Repository: samza
Updated Branches:
  refs/heads/master 922771ea4 -> dd18d5ae4


SAMZA-579; don't drop SSPs when a topic metadata refresh fails in kafka system consumer


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/dd18d5ae
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/dd18d5ae
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/dd18d5ae

Branch: refs/heads/master
Commit: dd18d5ae4f4f8440534cbeffdc494b9a07d1f9e8
Parents: 922771e
Author: Chris Riccomini <cr...@apache.org>
Authored: Fri Feb 27 17:18:04 2015 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Fri Feb 27 17:18:04 2015 -0800

----------------------------------------------------------------------
 .../apache/samza/container/SamzaContainer.scala |  3 +-
 .../apache/samza/system/kafka/BrokerProxy.scala |  4 +-
 .../system/kafka/KafkaSystemConsumer.scala      | 60 +++++++++++++-------
 .../apache/samza/system/kafka/MessageSink.scala |  5 +-
 .../samza/system/kafka/TestBrokerProxy.scala    |  2 +
 .../system/kafka/TestKafkaSystemConsumer.scala  |  2 +-
 .../samza/test/integration/join/Checker.java    | 36 ++++--------
 .../samza/test/integration/join/Emitter.java    |  4 --
 .../samza/test/integration/join/Joiner.java     |  1 -
 9 files changed, 64 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/dd18d5ae/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index e3b9d30..275eb1a 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -78,8 +78,9 @@ object SamzaContainer extends Logging {
     // validate that we don't leak JMX non-daemon threads if we have an
     // exception in the main method.
     val containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID).toInt
+    logger.info("Got container ID: %s" format containerId)
     val coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL)
-    logger.info("######### Coordinator URL in SafeMain() - " + coordinatorUrl)
+    logger.info("Got coordinator URL: %s" format coordinatorUrl)
     val jobModel = readJobModel(coordinatorUrl)
     val containerModel = jobModel.getContainers()(containerId.toInt)
     val config = jobModel.getConfig

http://git-wip-us.apache.org/repos/asf/samza/blob/dd18d5ae/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index cc0a4c6..f768263 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -137,6 +137,7 @@ class BrokerProxy(
             }
 
             while (!Thread.currentThread.isInterrupted) {
+              messageSink.refreshDropped
               if (nextOffsets.size == 0) {
                 debug("No TopicPartitions to fetch. Sleeping.")
                 Thread.sleep(sleepMSWhileNoTopicPartitions)
@@ -271,9 +272,8 @@ class BrokerProxy(
   override def toString() = "BrokerProxy for %s:%d" format (host, port)
 
   def start {
-    info("Starting " + toString)
-
     if (!thread.isAlive) {
+      info("Starting " + toString)
       thread.setDaemon(true)
       thread.setName(SAMZA_THREAD_NAME_PREFIX + BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX + thread.getName)
       thread.start

http://git-wip-us.apache.org/repos/asf/samza/blob/dd18d5ae/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 4918e3e..38117e2 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -38,6 +38,8 @@ import org.apache.samza.util.TopicMetadataStore
 import org.apache.samza.util.ExponentialSleepStrategy
 import kafka.api.TopicMetadata
 import org.apache.samza.util.ExponentialSleepStrategy
+import java.util.concurrent.ConcurrentHashMap
+import scala.collection.JavaConversions._
 
 object KafkaSystemConsumer {
   def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
@@ -86,27 +88,21 @@ private[kafka] class KafkaSystemConsumer(
 
   type HostPort = (String, Int)
   val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()
-  var nextOffsets = Map[SystemStreamPartition, String]()
+  val topicPartitionsAndOffsets: scala.collection.concurrent.Map[TopicAndPartition, String] = new ConcurrentHashMap[TopicAndPartition, String]()
   var perPartitionFetchThreshold = fetchThreshold
 
   def start() {
-    if (nextOffsets.size > 0) {
-      perPartitionFetchThreshold = fetchThreshold / nextOffsets.size
+    if (topicPartitionsAndOffsets.size > 0) {
+      perPartitionFetchThreshold = fetchThreshold / topicPartitionsAndOffsets.size
     }
 
-    val topicPartitionsAndOffsets = nextOffsets.map {
-      case (systemStreamPartition, offset) =>
-        val topicAndPartition = KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition)
-        (topicAndPartition, offset)
-    }
-
-    refreshBrokers(topicPartitionsAndOffsets)
+    refreshBrokers
   }
 
   override def register(systemStreamPartition: SystemStreamPartition, offset: String) {
     super.register(systemStreamPartition, offset)
 
-    nextOffsets += systemStreamPartition -> offset
+    topicPartitionsAndOffsets += KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition) -> offset
 
     metrics.registerTopicAndPartition(KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition))
   }
@@ -134,8 +130,9 @@ private[kafka] class KafkaSystemConsumer(
     }
   }
 
-  def refreshBrokers(topicPartitionsAndOffsets: Map[TopicAndPartition, String]) {
+  def refreshBrokers {
     var tpToRefresh = topicPartitionsAndOffsets.keySet.toList
+    info("Refreshing brokers for: %s" format topicPartitionsAndOffsets)
     retryBackoff.run(
       loop => {
         val topics = tpToRefresh.map(_.topic).toSet
@@ -146,12 +143,27 @@ private[kafka] class KafkaSystemConsumer(
         def refresh(tp: List[TopicAndPartition]) = {
           val head :: rest = tpToRefresh
           val nextOffset = topicPartitionsAndOffsets.get(head).get
-          getHostPort(topicMetadata(head.topic), head.partition) match {
-            case Some((host, port)) =>
-              val brokerProxy = brokerProxies.getOrElseUpdate((host, port), createBrokerProxy(host, port))
-              brokerProxy.addTopicPartition(head, Option(nextOffset))
-              brokerProxy.start
-            case None => warn("No such topic-partition: %s, dropping." format head)
+          // refreshBrokers can be called from abdicate and refreshDropped, 
+          // both of which are triggered from BrokerProxy threads. To prevent 
+          // accidentally creating multiple objects for the same broker, or 
+          // accidentally not updating the topicPartitionsAndOffsets variable, 
+          // we need to lock. 
+          this.synchronized {
+            // Check if we still need this TopicAndPartition inside the 
+            // critical section. If we don't, then skip it.
+            if (topicPartitionsAndOffsets.contains(head)) {
+              getHostPort(topicMetadata(head.topic), head.partition) match {
+                case Some((host, port)) =>
+                  val brokerProxy = brokerProxies.getOrElseUpdate((host, port), createBrokerProxy(host, port))
+                  brokerProxy.addTopicPartition(head, Option(nextOffset))
+                  brokerProxy.start
+                  debug("Claimed topic-partition (%s) for (%s)".format(head, brokerProxy))
+                  topicPartitionsAndOffsets -= head
+                case None => info("No metadata available for: %s. Will try to refresh and add to a consumer thread later." format head)
+              }
+            } else {
+              debug("Ignoring refresh for %s because we already added it from another thread." format head)
+            }
           }
           rest
         }
@@ -170,6 +182,15 @@ private[kafka] class KafkaSystemConsumer(
   }
 
   val sink = new MessageSink {
+    var lastDroppedRefresh = 0L
+
+    def refreshDropped() {
+      if (topicPartitionsAndOffsets.size > 0 && clock() - lastDroppedRefresh > 10000) {
+        refreshBrokers
+        lastDroppedRefresh = clock()
+      }
+    }
+
     def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
       setIsAtHead(toSystemStreamPartition(tp), isAtHighWatermark)
     }
@@ -202,7 +223,8 @@ private[kafka] class KafkaSystemConsumer(
 
     def abdicate(tp: TopicAndPartition, nextOffset: Long) {
       info("Abdicating for %s" format (tp))
-      refreshBrokers(Map(tp -> nextOffset.toString))
+      topicPartitionsAndOffsets += tp -> nextOffset.toString
+      refreshBrokers
     }
 
     private def toSystemStreamPartition(tp: TopicAndPartition) = {

http://git-wip-us.apache.org/repos/asf/samza/blob/dd18d5ae/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
index 48ad66e..50d4746 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.samza.system.kafka
 
 import kafka.common.TopicAndPartition
@@ -27,6 +28,8 @@ private[kafka] trait MessageSink {
   def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long): Unit
 
   def abdicate(tp: TopicAndPartition, nextOffset: Long): Unit
-  
+
+  def refreshDropped(): Unit
+
   def needsMoreMessages(tp: TopicAndPartition): Boolean
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/dd18d5ae/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index 6f05f3c..d559d8b 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -51,6 +51,8 @@ class TestBrokerProxy extends Logging {
       val receivedMessages = new scala.collection.mutable.ListBuffer[(TopicAndPartition, MessageAndOffset, Boolean)]()
       def abdicate(tp: TopicAndPartition, nextOffset: Long) {}
 
+      def refreshDropped() {}
+
       def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) { receivedMessages.add((tp, msg, msg.offset.equals(highWatermark))) }
 
       def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {

http://git-wip-us.apache.org/repos/asf/samza/blob/dd18d5ae/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
index 2c0f803..2a84328 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
@@ -35,7 +35,7 @@ class TestKafkaSystemConsumer {
   def testFetchThresholdShouldDivideEvenlyAmongPartitions {
     val metadataStore = new MockMetadataStore
     val consumer = new KafkaSystemConsumer("", new KafkaSystemConsumerMetrics, metadataStore, fetchThreshold = 50000) {
-      override def refreshBrokers(topicPartitionsAndOffsets: Map[TopicAndPartition, String]) {
+      override def refreshBrokers {
       }
     }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/dd18d5ae/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
index 1fef1ea..0598e51 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
@@ -35,11 +35,6 @@ import org.apache.samza.task.WindowableTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
 public class Checker implements StreamTask, WindowableTask, InitableTask {
   
   private static Logger logger = LoggerFactory.getLogger(Checker.class);
@@ -98,26 +93,19 @@ public class Checker implements StreamTask, WindowableTask, InitableTask {
     }
   }
   
-/*  private void checkEpoch(String epoch) {
+  private void checkEpoch(String epoch) {
     String curr = this.store.get(CURRENT_EPOCH);
-    if(curr == null)
+    if (curr == null)
       this.store.put(CURRENT_EPOCH, epoch);
-    else if(!curr.equals(epoch)) // should have curr > epoch
-      throw new IllegalArgumentException("Got epoch " + epoch + " but have not yet completed " + curr);
-  }*/
-    private void checkEpoch(String epoch) {
-        String curr = this.store.get(CURRENT_EPOCH);
-        if(curr == null)
-            this.store.put(CURRENT_EPOCH, epoch);
-        else {
-            int currentEpochInStore = Integer.parseInt(curr);
-            int currentEpochInMsg = Integer.parseInt(epoch);
-            if (currentEpochInMsg <= currentEpochInStore) {
-                if(currentEpochInMsg < currentEpochInStore)
-                    logger.info("#### Ignoring received epoch = " + epoch + " less than what is in store " + curr);
-            } else { // should have curr > epoch
-                throw new IllegalArgumentException("Got epoch " + epoch + " but have not yet completed " + curr);
-            }
-        }
+    else {
+      int currentEpochInStore = Integer.parseInt(curr);
+      int currentEpochInMsg = Integer.parseInt(epoch);
+      if (currentEpochInMsg <= currentEpochInStore) {
+        if (currentEpochInMsg < currentEpochInStore)
+          logger.info("#### Ignoring received epoch = " + epoch + " less than what is in store " + curr);
+      } else { // should have curr > epoch
+        throw new IllegalArgumentException("Got epoch " + epoch + " but have not yet completed " + curr);
+      }
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/dd18d5ae/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
index e958b51..82a633d 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
@@ -88,10 +88,6 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask {
       collector.send(envelope);
       this.state.put(COUNT, Integer.toString(getInt(COUNT) + 1));
     }
-/*    if(counter == max) {
-        logger.info("###### Committing because we finished emitting counter in this epoch");
-        coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER);
-    }*/
   }
   
   private void resetEpoch() {

http://git-wip-us.apache.org/repos/asf/samza/blob/dd18d5ae/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java
index cb30838..d2c0c7e 100644
--- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java
@@ -80,7 +80,6 @@ public class Joiner implements StreamTask, InitableTask {
       if(partitions.partitions.size() == expected) {
         logger.info("Completed: " + key + " -> " + Integer.toString(epoch));
         collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "completed-keys"), key, Integer.toString(epoch)));
-//        logger.info("Completed key " + key + " for epoch " + epoch);
       }
     }
     this.store.put(key, partitions.toString());