You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2013/10/23 20:20:28 UTC
[1/2] git commit: SAMZA-62;
retry on offset request failures in kafka checkpoint manager.
Updated Branches:
refs/heads/master dbe35de84 -> 4e6f1ca5d
SAMZA-62; retry on offset request failures in kafka checkpoint manager.
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/965bc933
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/965bc933
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/965bc933
Branch: refs/heads/master
Commit: 965bc9338ce8d59192e3abc350cfc7f62a87dbef
Parents: dbe35de
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Wed Oct 23 11:11:23 2013 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Oct 23 11:11:23 2013 -0700
----------------------------------------------------------------------
.../samza/checkpoint/kafka/KafkaCheckpointManager.scala | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/965bc933/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index a9ddc5c..27b38b2 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -130,10 +130,15 @@ class KafkaCheckpointManager(
bufferSize,
clientId)
val topicAndPartition = new TopicAndPartition(stateTopic, partitionId)
- val offset = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))))
+ val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))))
.partitionErrorAndOffsets
.get(topicAndPartition)
.getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:%d" format (stateTopic, partitionId)))
+
+ // Fail or retry if there was an an issue with the offset request.
+ ErrorMapping.maybeThrowException(offsetResponse.error)
+
+ val offset = offsetResponse
.offsets
.headOption
.getOrElse(throw new KafkaCheckpointException("Got response, but no offsets defined for %s:%d" format (stateTopic, partitionId)))
@@ -147,7 +152,7 @@ class KafkaCheckpointManager(
val request = new FetchRequestBuilder()
// Kafka returns 1 greater than the offset of the last message in
- //the topic, so subtract one to fetch the last message.
+ // the topic, so subtract one to fetch the last message.
.addFetch(stateTopic, partitionId, offset - 1, fetchSize)
.maxWait(500)
.minBytes(1)
[2/2] git commit: updating readme as part of SAMZA-9 YARN 2.2 upgrade.
Posted by cr...@apache.org.
updating readme as part of SAMZA-9 YARN 2.2 upgrade.
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/4e6f1ca5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/4e6f1ca5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/4e6f1ca5
Branch: refs/heads/master
Commit: 4e6f1ca5d9c68e5a0a3e14ff1d5089d9a184e1e9
Parents: 965bc93
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Wed Oct 23 11:16:33 2013 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Oct 23 11:16:33 2013 -0700
----------------------------------------------------------------------
README.md | 6 ++----
1 file changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e6f1ca5/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index d55740e..3cc9e74 100644
--- a/README.md
+++ b/README.md
@@ -20,11 +20,9 @@ To build Samza, run:
#### Scala and YARN
-Samza builds with [Scala](http://www.scala-lang.org/) 2.9.2 and [YARN](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html) 2.0.5-alpha, by default. Use the -PscalaVersion and -PyarnVersion switches to change versions. Samza supports building Scala with 2.8.1 or 2.9.2, and building YARN with 2.0.3-alpha, 2.0.4-alpha, and 2.0.5-alpha.
+Samza builds with [Scala](http://www.scala-lang.org/) 2.9.2 and [YARN](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html) 2.2.0, by default. Use the -PscalaVersion switches to change Scala versions. Samza supports building Scala with 2.8.1 or 2.9.2.
- ./gradlew -PscalaVersion=2.8.1 -PyarnVersion=2.0.3-alpha clean build
-
-YARN protocols are backwards incompatible, so you must pick the version that matches your YARN grid.
+ ./gradlew -PscalaVersion=2.8.1 clean build
### Testing Samza