You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2013/10/23 20:20:28 UTC

[1/2] git commit: SAMZA-62; retry on offset request failures in kafka checkpoint manager.

Updated Branches:
  refs/heads/master dbe35de84 -> 4e6f1ca5d


SAMZA-62; retry on offset request failures in kafka checkpoint manager.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/965bc933
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/965bc933
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/965bc933

Branch: refs/heads/master
Commit: 965bc9338ce8d59192e3abc350cfc7f62a87dbef
Parents: dbe35de
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Wed Oct 23 11:11:23 2013 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Oct 23 11:11:23 2013 -0700

----------------------------------------------------------------------
 .../samza/checkpoint/kafka/KafkaCheckpointManager.scala     | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/965bc933/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index a9ddc5c..27b38b2 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -130,10 +130,15 @@ class KafkaCheckpointManager(
           bufferSize,
           clientId)
         val topicAndPartition = new TopicAndPartition(stateTopic, partitionId)
-        val offset = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))))
+        val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))))
           .partitionErrorAndOffsets
           .get(topicAndPartition)
           .getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:%d" format (stateTopic, partitionId)))
+
+        // Fail or retry if there was an an issue with the offset request.
+        ErrorMapping.maybeThrowException(offsetResponse.error)
+
+        val offset = offsetResponse
           .offsets
           .headOption
           .getOrElse(throw new KafkaCheckpointException("Got response, but no offsets defined for %s:%d" format (stateTopic, partitionId)))
@@ -147,7 +152,7 @@ class KafkaCheckpointManager(
 
         val request = new FetchRequestBuilder()
           // Kafka returns 1 greater than the offset of the last message in 
-          //the topic, so subtract one to fetch the last message.
+          // the topic, so subtract one to fetch the last message.
           .addFetch(stateTopic, partitionId, offset - 1, fetchSize)
           .maxWait(500)
           .minBytes(1)


[2/2] git commit: updating readme as part of SAMZA-9 YARN 2.2 upgrade.

Posted by cr...@apache.org.
updating readme as part of SAMZA-9 YARN 2.2 upgrade.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/4e6f1ca5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/4e6f1ca5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/4e6f1ca5

Branch: refs/heads/master
Commit: 4e6f1ca5d9c68e5a0a3e14ff1d5089d9a184e1e9
Parents: 965bc93
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Wed Oct 23 11:16:33 2013 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Oct 23 11:16:33 2013 -0700

----------------------------------------------------------------------
 README.md | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e6f1ca5/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index d55740e..3cc9e74 100644
--- a/README.md
+++ b/README.md
@@ -20,11 +20,9 @@ To build Samza, run:
 
 #### Scala and YARN
 
-Samza builds with [Scala](http://www.scala-lang.org/) 2.9.2 and [YARN](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html) 2.0.5-alpha, by default. Use the -PscalaVersion and -PyarnVersion switches to change versions. Samza supports building Scala with 2.8.1 or 2.9.2, and building YARN with 2.0.3-alpha, 2.0.4-alpha, and 2.0.5-alpha.
+Samza builds with [Scala](http://www.scala-lang.org/) 2.9.2 and [YARN](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html) 2.2.0, by default. Use the -PscalaVersion switches to change Scala versions. Samza supports building Scala with 2.8.1 or 2.9.2.
 
-    ./gradlew -PscalaVersion=2.8.1 -PyarnVersion=2.0.3-alpha clean build
-
-YARN protocols are backwards incompatible, so you must pick the version that matches your YARN grid.
+    ./gradlew -PscalaVersion=2.8.1 clean build
 
 ### Testing Samza