You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/01/10 18:52:46 UTC
samza git commit: SAMZA-1530; Bump up Kafka dependency to 0.11
Repository: samza
Updated Branches:
refs/heads/master 07f28e948 -> a6540b4e3
SAMZA-1530; Bump up Kafka dependency to 0.11
Author: Dong Lin <li...@gmail.com>
Reviewers: Xinyu Liu <xi...@gmail.com>
Closes #395 from lindong28/SAMZA-1530
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a6540b4e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a6540b4e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a6540b4e
Branch: refs/heads/master
Commit: a6540b4e3d1d3916210c50be9b4b2b4920f885bb
Parents: 07f28e9
Author: Dong Lin <li...@gmail.com>
Authored: Wed Jan 10 10:52:38 2018 -0800
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Wed Jan 10 10:52:38 2018 -0800
----------------------------------------------------------------------
bin/check-all.sh | 2 +-
gradle/dependency-versions.gradle | 2 +-
.../apache/samza/system/kafka/BrokerProxy.scala | 13 ++++----
.../apache/samza/system/kafka/GetOffset.scala | 4 +--
.../samza/system/kafka/KafkaSystemAdmin.scala | 8 ++---
.../samza/system/kafka/TopicMetadataCache.scala | 2 +-
.../scala/org/apache/samza/util/KafkaUtil.scala | 8 +++--
.../samza/system/kafka/MockKafkaProducer.java | 25 +++++++++++++--
.../kafka/TestKafkaCheckpointManager.scala | 7 ++---
.../samza/system/kafka/TestBrokerProxy.scala | 9 +++---
.../system/kafka/TestKafkaSystemAdmin.scala | 26 ++++++----------
.../system/kafka/TestKafkaSystemConsumer.scala | 4 +--
.../system/kafka/TestTopicMetadataCache.scala | 32 +++++++++++---------
.../org/apache/samza/utils/TestKafkaUtil.scala | 7 +++--
.../processor/TestZkLocalApplicationRunner.java | 2 +-
.../test/integration/StreamTaskTestUtil.scala | 7 ++---
16 files changed, 86 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/bin/check-all.sh
----------------------------------------------------------------------
diff --git a/bin/check-all.sh b/bin/check-all.sh
index 2f9f03c..f168bc8 100755
--- a/bin/check-all.sh
+++ b/bin/check-all.sh
@@ -21,7 +21,7 @@
set -e
-SCALAs=( "2.10" "2.11" "2.12" )
+SCALAs=( "2.11" "2.12" )
JDKs=( "JAVA8_HOME" )
YARNs=( "2.6.1" "2.7.1" )
http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 20a1d56..2e45914 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -33,7 +33,7 @@
jodaTimeVersion = "2.2"
joptSimpleVersion = "3.2"
junitVersion = "4.8.1"
- kafkaVersion = "0.10.1.1"
+ kafkaVersion = "0.11.0.2"
log4jVersion = "1.2.17"
metricsVersion = "2.2.0"
mockitoVersion = "1.10.19"
http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 5338886..8a6618d 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -182,7 +182,7 @@ class BrokerProxy(
firstCallBarrier.countDown()
// Split response into errors and non errors, processing the errors first
- val (nonErrorResponses, errorResponses) = response.data.toSet.partition(_._2.error == ErrorMapping.NoError)
+ val (nonErrorResponses, errorResponses) = response.data.toSet.partition(_._2.error.code() == ErrorMapping.NoError)
handleErrors(errorResponses, response)
@@ -219,18 +219,17 @@ class BrokerProxy(
immutableNextOffsetsCopy.keySet.foreach(abdicate(_))
}
- def handleErrors(errorResponses: Set[(TopicAndPartition, FetchResponsePartitionData)], response:FetchResponse) = {
+ def handleErrors(errorResponses: Set[(TopicAndPartition, FetchResponsePartitionData)], response: FetchResponse) = {
// FetchResponse should really return Option and a list of the errors so we don't have to find them ourselves
- case class Error(tp: TopicAndPartition, code: Short, exception: Throwable)
+ case class Error(tp: TopicAndPartition, code: Short, exception: Exception)
// Now subdivide the errors into three types: non-recoverable, not leader (== abdicate) and offset out of range (== get new offset)
// Convert FetchResponse into easier-to-work-with Errors
val errors = for (
(topicAndPartition, responseData) <- errorResponses;
- errorCode <- Option(response.errorCode(topicAndPartition.topic, topicAndPartition.partition)); // Scala's being cranky about referring to error.getKey values...
- exception <- Option(ErrorMapping.exceptionFor(errorCode))
- ) yield new Error(topicAndPartition, errorCode, exception)
+ error <- Option(response.error(topicAndPartition.topic, topicAndPartition.partition)) // Scala's being cranky about referring to error.getKey values...
+ ) yield new Error(topicAndPartition, error.code(), error.exception())
val (notLeaderOrUnknownTopic, otherErrors) = errors.partition { case (e) => e.code == ErrorMapping.NotLeaderForPartitionCode || e.code == ErrorMapping.UnknownTopicOrPartitionCode }
val (offsetOutOfRangeErrors, remainingErrors) = otherErrors.partition(_.code == ErrorMapping.OffsetOutOfRangeCode)
@@ -241,7 +240,7 @@ class BrokerProxy(
// handle the recoverable errors.
remainingErrors.foreach(e => {
warn("Got non-recoverable error codes during multifetch. Throwing an exception to trigger reconnect. Errors: %s" format remainingErrors.mkString(","))
- KafkaUtil.maybeThrowException(e.code) })
+ KafkaUtil.maybeThrowException(e.exception) })
notLeaderOrUnknownTopic.foreach(e => abdicate(e.tp))
http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
index 5528702..040e246 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala
@@ -60,7 +60,7 @@ class GetOffset(
val messages = consumer.defaultFetch((topicAndPartition, offset.toLong))
if (messages.hasError) {
- KafkaUtil.maybeThrowException(messages.errorCode(topicAndPartition.topic, topicAndPartition.partition))
+ KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic, topicAndPartition.partition).exception())
}
info("Able to successfully read from offset %s for topic and partition %s. Using it to instantiate consumer." format (offset, topicAndPartition))
@@ -86,7 +86,7 @@ class GetOffset(
.get(topicAndPartition)
.getOrElse(toss("Unable to find offset information for %s" format topicAndPartition))
- KafkaUtil.maybeThrowException(partitionOffsetResponse.error)
+ KafkaUtil.maybeThrowException(partitionOffsetResponse.error.exception())
partitionOffsetResponse
.offsets
http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 013b292..4715141 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -164,7 +164,7 @@ class KafkaSystemAdmin(
metadataTTL)
val result = metadata.map {
case (topic, topicMetadata) => {
- KafkaUtil.maybeThrowException(topicMetadata.errorCode)
+ KafkaUtil.maybeThrowException(topicMetadata.error.exception())
val partitionsMap = topicMetadata.partitionsMetadata.map {
pm =>
new Partition(pm.partitionId) -> new SystemStreamPartitionMetadata("", "", "")
@@ -350,7 +350,7 @@ class KafkaSystemAdmin(
.values
// Convert the topic metadata to a Seq[(Broker, TopicAndPartition)]
.flatMap(topicMetadata => {
- KafkaUtil.maybeThrowException(topicMetadata.errorCode)
+ KafkaUtil.maybeThrowException(topicMetadata.error.exception())
topicMetadata
.partitionsMetadata
// Convert Seq[PartitionMetadata] to Seq[(Broker, TopicAndPartition)]
@@ -390,7 +390,7 @@ class KafkaSystemAdmin(
.getOffsetsBefore(new OffsetRequest(partitionOffsetInfo))
.partitionErrorAndOffsets
.mapValues(partitionErrorAndOffset => {
- KafkaUtil.maybeThrowException(partitionErrorAndOffset.error)
+ KafkaUtil.maybeThrowException(partitionErrorAndOffset.error.exception())
partitionErrorAndOffset.offsets.head
})
@@ -480,7 +480,7 @@ class KafkaSystemAdmin(
val metadataStore = new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout)
val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, metadataStore.getTopicInfo, metadataTTL)
val topicMetadata = topicMetadataMap(topicName)
- KafkaUtil.maybeThrowException(topicMetadata.errorCode)
+ KafkaUtil.maybeThrowException(topicMetadata.error.exception())
val partitionCount = topicMetadata.partitionsMetadata.length
if (partitionCount != spec.getPartitionCount) {
http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
index 82ecf1a..8a3ab2b 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
@@ -73,6 +73,6 @@ object TopicMetadataCache extends Logging {
* partition's metadata has a bad errorCode.
*/
def hasBadErrorCode(streamMetadata: TopicMetadata) = {
- KafkaUtil.isBadErrorCode(streamMetadata.errorCode) || streamMetadata.partitionsMetadata.exists(partitionMetadata => KafkaUtil.isBadErrorCode(partitionMetadata.errorCode))
+ KafkaUtil.isBadErrorCode(streamMetadata.error.code()) || streamMetadata.partitionsMetadata.exists(partitionMetadata => KafkaUtil.isBadErrorCode(partitionMetadata.error.code()))
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index 1410cbb..5b0137a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -29,7 +29,8 @@ import org.apache.samza.config.{ApplicationConfig, Config, ConfigException}
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.execution.StreamManager
import org.apache.samza.system.OutgoingMessageEnvelope
-import kafka.common.{ErrorMapping, ReplicaNotAvailableException}
+import org.apache.kafka.common.errors.ReplicaNotAvailableException
+import kafka.common.ErrorMapping
import org.apache.kafka.common.errors.TopicExistsException
import org.apache.samza.system.kafka.TopicMetadataCache
@@ -71,9 +72,10 @@ object KafkaUtil extends Logging {
* <a href="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol">protocol
* docs</a>, ReplicaNotAvailableException can be safely ignored.
*/
- def maybeThrowException(code: Short) {
+ def maybeThrowException(e: Exception) {
try {
- ErrorMapping.maybeThrowException(code)
+ if (e != null)
+ throw e
} catch {
case e: ReplicaNotAvailableException =>
debug("Got ReplicaNotAvailableException, but ignoring since it's safe to do so.")
http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
index 024c6e6..e66b7c3 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import kafka.producer.ProducerClosedException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -42,6 +43,7 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.test.TestUtils;
public class MockKafkaProducer implements Producer<byte[], byte[]> {
@@ -98,7 +100,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
}
private RecordMetadata getRecordMetadata(ProducerRecord record) {
- return new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, this.msgsSent.get(), Record.NO_TIMESTAMP, -1, -1, -1);
+ return new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, this.msgsSent.get(), -1L, -1, -1, -1);
}
@Override
@@ -190,6 +192,25 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
new FlushRunnable(0).run();
}
+ public void initTransactions() {
+
+ }
+
+ public void abortTransaction() {
+
+ }
+
+ public void beginTransaction() {
+
+ }
+
+ public void commitTransaction() {
+
+ }
+
+ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {
+
+ }
private static class FutureFailure implements Future<RecordMetadata> {
@@ -232,7 +253,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
public FutureSuccess(ProducerRecord record, int offset) {
this.record = record;
- this._metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset, Record.NO_TIMESTAMP, -1, -1, -1);
+ this._metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset, RecordBatch.NO_TIMESTAMP, -1, -1, -1);
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index ec9f3a0..86cb418 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -52,15 +52,12 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
val checkpoint1 = new Checkpoint(ImmutableMap.of(ssp, "offset-1"))
val checkpoint2 = new Checkpoint(ImmutableMap.of(ssp, "offset-2"))
val taskName = new TaskName("Partition 0")
-
- var brokers: String = null
var config: Config = null
@Before
override def setUp {
super.setUp
TestUtils.waitUntilTrue(() => servers.head.metadataCache.getAliveBrokers.size == numBrokers, "Wait for cache to update")
- brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",")
config = getConfig()
}
@@ -140,7 +137,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
val defaultSerializer = classOf[ByteArraySerializer].getCanonicalName
val props = new Properties()
props.putAll(ImmutableMap.of(
- ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers,
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, defaultSerializer,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, defaultSerializer))
props
@@ -151,7 +148,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
.put(JobConfig.JOB_NAME, "some-job-name")
.put(JobConfig.JOB_ID, "i001")
.put(s"systems.$checkpointSystemName.samza.factory", classOf[KafkaSystemFactory].getCanonicalName)
- .put(s"systems.$checkpointSystemName.producer.bootstrap.servers", brokers)
+ .put(s"systems.$checkpointSystemName.producer.bootstrap.servers", brokerList)
.put(s"systems.$checkpointSystemName.consumer.zookeeper.connect", zkConnect)
.put("task.checkpoint.system", checkpointSystemName)
.build())
http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index f0bdafd..d510076 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -24,9 +24,10 @@ import java.nio.ByteBuffer
import java.util.concurrent.CountDownLatch
import kafka.api.{PartitionOffsetsResponse, _}
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.common.TopicAndPartition
import kafka.consumer.SimpleConsumer
import kafka.message.{ByteBufferMessageSet, Message, MessageAndOffset, MessageSet}
+import org.apache.kafka.common.protocol.Errors
import org.apache.samza.SamzaException
import org.apache.samza.util.Logging
import org.junit.Assert._
@@ -165,7 +166,7 @@ class TestBrokerProxy extends Logging {
messageSet
}
- val fetchResponsePartitionData = FetchResponsePartitionData(0, 500, messageSet)
+ val fetchResponsePartitionData = FetchResponsePartitionData(Errors.NONE, 500, messageSet)
val map = scala.Predef.Map[TopicAndPartition, FetchResponsePartitionData](tp -> fetchResponsePartitionData)
when(fetchResponse.data).thenReturn(map.toSeq)
@@ -257,12 +258,12 @@ class TestBrokerProxy extends Logging {
}
val mfr = mock(classOf[FetchResponse])
when(mfr.hasError).thenReturn(true)
- when(mfr.errorCode("topic", 42)).thenReturn(ErrorMapping.OffsetOutOfRangeCode)
+ when(mfr.error("topic", 42)).thenReturn(Errors.OFFSET_OUT_OF_RANGE)
val messageSet = mock(classOf[MessageSet])
when(messageSet.iterator).thenReturn(Iterator.empty)
val response = mock(classOf[FetchResponsePartitionData])
- when(response.error).thenReturn(ErrorMapping.OffsetOutOfRangeCode)
+ when(response.error).thenReturn(Errors.OFFSET_OUT_OF_RANGE)
val responseMap = Map(tp -> response)
when(mfr.data).thenReturn(responseMap.toSeq)
invocationCount += 1
http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index 65c43f5..2039447 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -24,7 +24,8 @@ package org.apache.samza.system.kafka
import java.util.{Properties, UUID}
import kafka.admin.AdminUtils
-import kafka.common.{ErrorMapping, LeaderNotAvailableException}
+import org.apache.kafka.common.errors.LeaderNotAvailableException
+import org.apache.kafka.common.protocol.Errors
import kafka.consumer.{Consumer, ConsumerConfig}
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
@@ -68,19 +69,13 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
@BeforeClass
override def setUp {
super.setUp
-
val config = new java.util.HashMap[String, String]()
-
- brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",")
-
- config.put("bootstrap.servers", brokers)
+ config.put("bootstrap.servers", brokerList)
config.put("acks", "all")
config.put("serializer.class", "kafka.serializer.StringEncoder")
-
producerConfig = new KafkaProducerConfig("kafka", "i001", config)
-
producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
- metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
+ metadataStore = new ClientUtilTopicMetadataStore(brokerList, "some-job-name")
}
@@ -107,9 +102,8 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
try {
val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), SYSTEM, metadataStore.getTopicInfo)
val topicMetadata = topicMetadataMap(topic)
- val errorCode = topicMetadata.errorCode
- KafkaUtil.maybeThrowException(errorCode)
+ KafkaUtil.maybeThrowException(topicMetadata.error.exception())
done = expectedPartitionCount == topicMetadata.partitionsMetadata.size
} catch {
@@ -137,11 +131,11 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
}
def createSystemAdmin: KafkaSystemAdmin = {
- new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure))
+ new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure))
}
def createSystemAdmin(coordinatorStreamProperties: Properties, coordinatorStreamReplicationFactor: Int, topicMetaInformation: Map[String, ChangelogInfo]): KafkaSystemAdmin = {
- new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamProperties, coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, topicMetaInformation, Map())
+ new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamProperties, coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, topicMetaInformation, Map())
}
}
@@ -281,7 +275,7 @@ class TestKafkaSystemAdmin {
@Test
def testShouldCreateCoordinatorStream {
val topic = "test-coordinator-stream"
- val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamReplicationFactor = 3)
+ val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokerList, () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamReplicationFactor = 3)
val spec = StreamSpec.createCoordinatorStreamSpec(topic, "kafka")
systemAdmin.createStream(spec)
@@ -294,14 +288,14 @@ class TestKafkaSystemAdmin {
assertEquals(3, partitionMetadata.replicas.size)
}
- class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin(SYSTEM, brokers, () => ZkUtils(zkConnect, 6000, 6000, zkSecure)) {
+ class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin(SYSTEM, brokerList, () => ZkUtils(zkConnect, 6000, 6000, zkSecure)) {
import kafka.api.TopicMetadata
var metadataCallCount = 0
// Simulate Kafka telling us that the leader for the topic is not available
override def getTopicMetadata(topics: Set[String]) = {
metadataCallCount += 1
- val topicMetadata = TopicMetadata(topic = "quux", partitionsMetadata = Seq(), errorCode = ErrorMapping.LeaderNotAvailableCode)
+ val topicMetadata = TopicMetadata(topic = "quux", partitionsMetadata = Seq(), error = Errors.LEADER_NOT_AVAILABLE)
Map("quux" -> topicMetadata)
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
index 8a5cbc2..4dd170f 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
@@ -25,7 +25,7 @@ import kafka.cluster.Broker
import kafka.common.TopicAndPartition
import kafka.message.Message
import kafka.message.MessageAndOffset
-
+import org.apache.kafka.common.protocol.Errors
import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.Partition
@@ -68,7 +68,7 @@ class TestKafkaSystemConsumer {
// Lie and tell the store that the partition metadata is empty. We can't
// use partition metadata because it has Broker in its constructor, which
// is package private to Kafka.
- val metadataStore = new MockMetadataStore(Map(streamName -> TopicMetadata(streamName, Seq.empty, 0)))
+ val metadataStore = new MockMetadataStore(Map(streamName -> TopicMetadata(streamName, Seq.empty, Errors.NONE)))
var hosts = List[String]()
var getHostPortCount = 0
val consumer = new KafkaSystemConsumer(systemName, systemAdmin, metrics, metadataStore, clientId) {
http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
index 50c13ab..9cc2f63 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
@@ -20,7 +20,8 @@
package org.apache.samza.system.kafka
import java.util.concurrent.CountDownLatch
-import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+
import kafka.api.TopicMetadata
import org.I0Itec.zkclient.ZkClient
import org.apache.samza.util.Clock
@@ -30,6 +31,7 @@ import org.junit.Before
import org.junit.Test
import kafka.common.ErrorMapping
import kafka.api.PartitionMetadata
+import org.apache.kafka.common.protocol.Errors
class TestTopicMetadataCache {
@@ -41,8 +43,8 @@ class TestTopicMetadataCache {
class MockTopicMetadataStore extends TopicMetadataStore {
var mockCache = Map(
- "topic1" -> new TopicMetadata("topic1", List.empty, 0),
- "topic2" -> new TopicMetadata("topic2", List.empty, 0))
+ "topic1" -> new TopicMetadata("topic1", List.empty, Errors.NONE),
+ "topic2" -> new TopicMetadata("topic2", List.empty, Errors.NONE))
var numberOfCalls: AtomicInteger = new AtomicInteger(0)
def getTopicInfo(topics: Set[String]) = {
@@ -53,7 +55,7 @@ class TestTopicMetadataCache {
}
def setErrorCode(topic: String, errorCode: Short) {
- mockCache += topic -> new TopicMetadata(topic, List.empty, errorCode)
+ mockCache += topic -> new TopicMetadata(topic, List.empty, Errors.forCode(errorCode))
}
}
@@ -70,7 +72,7 @@ class TestTopicMetadataCache {
mockStore.setErrorCode("topic1", 3)
var metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
assertEquals("topic1", metadata("topic1").topic)
- assertEquals(3, metadata("topic1").errorCode)
+ assertEquals(3, metadata("topic1").error.code)
assertEquals(1, mockStore.numberOfCalls.get())
// Retrieve the same topic from the cache which has an error code. Ensure the store is called to refresh the cache
@@ -78,21 +80,21 @@ class TestTopicMetadataCache {
mockStore.setErrorCode("topic1", 0)
metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
assertEquals("topic1", metadata("topic1").topic)
- assertEquals(0, metadata("topic1").errorCode)
+ assertEquals(0, metadata("topic1").error.code)
assertEquals(2, mockStore.numberOfCalls.get())
// Retrieve the same topic from the cache with refresh rate greater than the last update. Ensure the store is not
// called
metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
assertEquals("topic1", metadata("topic1").topic)
- assertEquals(0, metadata("topic1").errorCode)
+ assertEquals(0, metadata("topic1").error.code)
assertEquals(2, mockStore.numberOfCalls.get())
// Ensure that refresh happens when refresh rate is less than the last update. Ensure the store is called
mockTime.currentValue = 11
metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
assertEquals("topic1", metadata("topic1").topic)
- assertEquals(0, metadata("topic1").errorCode)
+ assertEquals(0, metadata("topic1").error.code)
assertEquals(3, mockStore.numberOfCalls.get())
}
@@ -113,7 +115,7 @@ class TestTopicMetadataCache {
waitForThreadStart.await()
val metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
numAssertionSuccess.compareAndSet(true, metadata("topic1").topic.equals("topic1"))
- numAssertionSuccess.compareAndSet(true, metadata("topic1").errorCode == 0)
+ numAssertionSuccess.compareAndSet(true, metadata("topic1").error.code == 0)
}
})
threads(i).start()
@@ -127,11 +129,11 @@ class TestTopicMetadataCache {
@Test
def testBadErrorCodes {
- val partitionMetadataBad = new PartitionMetadata(0, None, Seq(), errorCode = ErrorMapping.LeaderNotAvailableCode)
- val partitionMetadataGood = new PartitionMetadata(0, None, Seq(), errorCode = ErrorMapping.NoError)
- assertTrue(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List.empty, ErrorMapping.RequestTimedOutCode)))
- assertTrue(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List(partitionMetadataBad), ErrorMapping.NoError)))
- assertFalse(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List.empty, ErrorMapping.NoError)))
- assertFalse(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List(partitionMetadataGood), ErrorMapping.NoError)))
+ val partitionMetadataBad = new PartitionMetadata(0, None, Seq(), error = Errors.LEADER_NOT_AVAILABLE)
+ val partitionMetadataGood = new PartitionMetadata(0, None, Seq(), error = Errors.NONE)
+ assertTrue(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List.empty, Errors.REQUEST_TIMED_OUT)))
+ assertTrue(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List(partitionMetadataBad), Errors.NONE)))
+ assertFalse(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List.empty, Errors.NONE)))
+ assertFalse(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List(partitionMetadataGood), Errors.NONE)))
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala b/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala
index 848cfc8..3548aea 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala
@@ -20,14 +20,15 @@
package org.apache.samza.utils
import org.junit.Test
-import org.scalatest.{ Matchers => ScalaTestMatchers }
+import org.scalatest.{Matchers => ScalaTestMatchers}
import org.apache.samza.util.KafkaUtil
import kafka.common.ErrorMapping
+import org.apache.kafka.common.protocol.Errors
class TestKafkaUtil extends ScalaTestMatchers {
@Test
def testMaybeThrowException {
- intercept[Exception] { KafkaUtil.maybeThrowException(ErrorMapping.UnknownTopicOrPartitionCode) }
- KafkaUtil.maybeThrowException(ErrorMapping.ReplicaNotAvailableCode)
+ intercept[Exception] { KafkaUtil.maybeThrowException(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()) }
+ KafkaUtil.maybeThrowException(Errors.REPLICA_NOT_AVAILABLE.exception())
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 9c5dad5..97fe1f8 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -541,7 +541,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
// Set up kafka topics.
publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
- MapConfig kafkaProducerConfig = new MapConfig(ImmutableMap.of(String.format("systems.%s.producer.%s", TEST_SYSTEM, ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG), "1000"));
+ MapConfig kafkaProducerConfig = new MapConfig(ImmutableMap.of(String.format("systems.%s.producer.%s", TEST_SYSTEM, ProducerConfig.MAX_BLOCK_MS_CONFIG), "1000"));
MapConfig applicationRunnerConfig1 = new MapConfig(ImmutableList.of(applicationConfig1, kafkaProducerConfig));
MapConfig applicationRunnerConfig2 = new MapConfig(ImmutableList.of(applicationConfig2, kafkaProducerConfig));
LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationRunnerConfig1);
http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 4ba51f3..d9261ad 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -117,9 +117,7 @@ object StreamTaskTestUtil {
})
servers = configs.map(TestUtils.createServer(_)).toBuffer
-
- val brokerList = TestUtils.getBrokerListStrFromServers(servers, SecurityProtocol.PLAINTEXT)
- brokers = brokerList.split(",").map(p => "127.0.0.1" + p).mkString(",")
+ brokers = TestUtils.getBrokerListStrFromServers(servers, SecurityProtocol.PLAINTEXT)
// setup the zookeeper and bootstrap servers for local kafka cluster
jobConfig ++= Map("systems.kafka.consumer.zookeeper.connect" -> zkConnect,
@@ -161,9 +159,8 @@ object StreamTaskTestUtil {
topics.foreach(topic => {
val topicMetadata = topicMetadataMap(topic)
- val errorCode = topicMetadata.errorCode
- KafkaUtil.maybeThrowException(errorCode)
+ KafkaUtil.maybeThrowException(topicMetadata.error.exception())
})
done = true