You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/12/23 20:23:02 UTC
samza git commit: SAMZA-855;
Upgrade Samza's Kafka client version to 0.10.0.1.
Repository: samza
Updated Branches:
refs/heads/master 6666ced0c -> c65802a98
SAMZA-855; Upgrade Samza's Kafka client version to 0.10.0.1.
This is based out of PR 15, with merge conflicts resolved and version number of zkClient set to 0.8(compatible with Kafka 0.0.10.1 version).
Tested and validated this patch with internal Samza build at LinkedIn. This looks good.
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Reviewers: Yi Pan <ni...@gmail.com>
Closes #33 from shanthoosh/kafka_10_upgrade
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c65802a9
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c65802a9
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c65802a9
Branch: refs/heads/master
Commit: c65802a98ed8ee7c6481352387e42c58682dc067
Parents: 6666ced
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Fri Dec 23 12:22:55 2016 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Fri Dec 23 12:22:55 2016 -0800
----------------------------------------------------------------------
build.gradle | 3 +
gradle/dependency-versions.gradle | 4 +-
.../kafka/KafkaCheckpointManager.scala | 7 +-
.../kafka/KafkaCheckpointManagerFactory.scala | 11 +-
.../samza/config/RegExTopicGenerator.scala | 6 +-
.../samza/system/kafka/KafkaSystemAdmin.scala | 11 +-
.../system/kafka/KafkaSystemConsumer.scala | 14 +--
.../samza/system/kafka/KafkaSystemFactory.scala | 9 +-
.../scala/org/apache/samza/util/KafkaUtil.scala | 4 +-
.../samza/system/kafka/MockKafkaProducer.java | 18 ++-
.../java/org/apache/samza/utils/TestUtils.java | 112 -------------------
.../kafka/TestKafkaCheckpointManager.scala | 101 ++++++++---------
.../system/kafka/TestKafkaSystemAdmin.scala | 98 ++++++++--------
.../system/kafka/TestKafkaSystemConsumer.scala | 10 +-
.../system/kafka/TestKafkaSystemProducer.scala | 8 +-
.../src/main/python/configs/downloads.json | 4 +-
samza-test/src/main/python/configs/kafka.json | 22 ++--
.../src/main/python/configs/zookeeper.json | 10 +-
.../test/integration/StreamTaskTestUtil.scala | 103 ++++++++---------
19 files changed, 220 insertions(+), 335 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 0c04931..5b41c52 100644
--- a/build.gradle
+++ b/build.gradle
@@ -257,6 +257,7 @@ project(":samza-kafka_$scalaVersion") {
compile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion"
compile "org.apache.kafka:kafka-clients:$kafkaVersion"
testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"
+ testCompile "org.apache.kafka:kafka-clients:$kafkaVersion:test"
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
@@ -648,6 +649,8 @@ project(":samza-test_$scalaVersion") {
testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"
testCompile "com.101tec:zkclient:$zkClientVersion"
testCompile project(":samza-kafka_$scalaVersion")
+ testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"
+ testCompile "org.apache.kafka:kafka-clients:$kafkaVersion:test"
testCompile project(":samza-core_$scalaVersion").sourceSets.test.output
testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 1987745..976a49c 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -25,10 +25,10 @@
junitVersion = "4.8.1"
mockitoVersion = "1.8.4"
scalaTestVersion = "2.2.4"
- zkClientVersion = "0.3"
+ zkClientVersion = "0.8"
zookeeperVersion = "3.3.4"
metricsVersion = "2.2.0"
- kafkaVersion = "0.8.2.1"
+ kafkaVersion = "0.10.0.1"
commonsHttpClientVersion = "3.1"
rocksdbVersion = "3.13.1"
yarnVersion = "2.6.1"
http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 8f18a92..6461f9d 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -27,8 +27,9 @@ import kafka.api._
import kafka.common.{ErrorMapping, InvalidMessageSizeException, TopicAndPartition, UnknownTopicOrPartitionException}
import kafka.consumer.SimpleConsumer
import kafka.message.InvalidMessageException
-import kafka.utils.Utils
-import org.I0Itec.zkclient.ZkClient
+import kafka.utils.ZkUtils
+
+import org.apache.kafka.common.utils.Utils
import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
import org.apache.samza.SamzaException
import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
@@ -56,7 +57,7 @@ class KafkaCheckpointManager(
fetchSize: Int,
val metadataStore: TopicMetadataStore,
connectProducer: () => Producer[Array[Byte], Array[Byte]],
- val connectZk: () => ZkClient,
+ val connectZk: () => ZkUtils,
systemStreamPartitionGrouperFactoryString: String,
failOnCheckpointValidation: Boolean,
val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index 4e97376..c42882e 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -21,8 +21,7 @@ package org.apache.samza.checkpoint.kafka
import java.util.Properties
-import kafka.utils.ZKStringSerializer
-import org.I0Itec.zkclient.ZkClient
+import kafka.utils.ZkUtils
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.samza.SamzaException
import org.apache.samza.checkpoint.{CheckpointManager, CheckpointManagerFactory}
@@ -40,8 +39,8 @@ object KafkaCheckpointManagerFactory {
"compression.type" -> "none")
// Set the checkpoint topic configs to have a very small segment size and
- // enable log compaction. This keeps job startup time small since there
- // are fewer useless (overwritten) messages to read from the checkpoint
+ // enable log compaction. This keeps job startup time small since there
+ // are fewer useless (overwritten) messages to read from the checkpoint
// topic.
def getCheckpointTopicProperties(config: Config) = {
val segmentBytes: Int = if (config == null) {
@@ -79,7 +78,7 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
val zkConnect = Option(consumerConfig.zkConnect)
.getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
val connectZk = () => {
- new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+ ZkUtils(zkConnect, 6000, 6000, false)
}
val socketTimeout = consumerConfig.socketTimeoutMs
@@ -99,4 +98,4 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
config.failOnCheckpointValidation,
checkpointTopicProperties = getCheckpointTopicProperties(config))
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
index bcbad27..4e3b247 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
@@ -20,7 +20,7 @@
package org.apache.samza.config
import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ ZkUtils, ZKStringSerializer }
+import kafka.utils.ZkUtils
import org.apache.samza.config.KafkaConfig.{ Config2Kafka, REGEX_RESOLVED_STREAMS }
import org.apache.samza.SamzaException
import org.apache.samza.util.Util
@@ -102,10 +102,10 @@ class RegExTopicGenerator extends ConfigRewriter with Logging {
val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, "")
val zkConnect = Option(consumerConfig.zkConnect)
.getOrElse(throw new SamzaException("No zookeeper.connect for system %s defined in config." format systemName))
- val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+ val zkClient = new ZkClient(zkConnect, 6000, 6000)
try {
- ZkUtils.getAllTopics(zkClient)
+ ZkUtils(zkClient, isZkSecurityEnabled = false).getAllTopics()
} finally {
zkClient.close()
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 5927cca..955fa44 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -21,14 +21,15 @@ package org.apache.samza.system.kafka
import java.util
-import org.I0Itec.zkclient.ZkClient
import org.apache.samza.Partition
import org.apache.samza.SamzaException
import org.apache.samza.system.{ExtendedSystemAdmin, SystemStreamMetadata, SystemStreamPartition}
-import org.apache.samza.util.{ ClientUtilTopicMetadataStore, ExponentialSleepStrategy, Logging }
+import org.apache.samza.util.{ ClientUtilTopicMetadataStore, ExponentialSleepStrategy, Logging, KafkaUtil }
import kafka.api._
import kafka.consumer.SimpleConsumer
import kafka.common.{ TopicExistsException, TopicAndPartition }
+import kafka.consumer.ConsumerConfig
+import kafka.utils.ZkUtils
import java.util.{ Properties, UUID }
import scala.collection.JavaConversions
import scala.collection.JavaConversions._
@@ -37,6 +38,7 @@ import kafka.consumer.ConsumerConfig
import kafka.admin.AdminUtils
import org.apache.samza.util.KafkaUtil
+
object KafkaSystemAdmin extends Logging {
/**
* A helper method that takes oldest, newest, and upcoming offsets for each
@@ -97,10 +99,10 @@ class KafkaSystemAdmin(
brokerListString: String,
/**
- * A method that returns a ZkClient for the Kafka system. This is invoked
+ * A method that returns a ZkUtils for the Kafka system. This is invoked
* when the system admin is attempting to create a coordinator stream.
*/
- connectZk: () => ZkClient,
+ connectZk: () => ZkUtils,
/**
* Custom properties to use when the system admin tries to create a new
@@ -183,6 +185,7 @@ class KafkaSystemAdmin(
* Returns the offset for the message after the specified offset for each
* SystemStreamPartition that was passed in.
*/
+
override def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = {
// This is safe to do with Kafka, even if a topic is key-deduped. If the
// offset doesn't exist on a compacted topic, Kafka will return the first
http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 36567b6..fa685ee 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -24,7 +24,7 @@ import org.apache.samza.util.Logging
import kafka.message.Message
import kafka.message.MessageAndOffset
import org.apache.samza.Partition
-import kafka.utils.Utils
+import org.apache.kafka.common.utils.Utils
import org.apache.samza.util.Clock
import java.util.UUID
import kafka.serializer.DefaultDecoder
@@ -197,13 +197,13 @@ private[kafka] class KafkaSystemConsumer(
// This avoids trying to re-add the same topic partition repeatedly
def refresh(tp: List[TopicAndPartition]) = {
val head :: rest = tpToRefresh
- // refreshBrokers can be called from abdicate and refreshDropped,
- // both of which are triggered from BrokerProxy threads. To prevent
- // accidentally creating multiple objects for the same broker, or
- // accidentally not updating the topicPartitionsAndOffsets variable,
- // we need to lock.
+ // refreshBrokers can be called from abdicate and refreshDropped,
+ // both of which are triggered from BrokerProxy threads. To prevent
+ // accidentally creating multiple objects for the same broker, or
+ // accidentally not updating the topicPartitionsAndOffsets variable,
+ // we need to lock.
this.synchronized {
- // Check if we still need this TopicAndPartition inside the
+ // Check if we still need this TopicAndPartition inside the
// critical section. If we don't, then skip it.
topicPartitionsAndOffsets.get(head) match {
case Some(nextOffset) =>
http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index b574176..d0e3089 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -20,6 +20,7 @@
package org.apache.samza.system.kafka
import java.util.Properties
+import kafka.utils.ZkUtils
import org.apache.samza.SamzaException
import org.apache.samza.util.{Logging, KafkaUtil, ExponentialSleepStrategy, ClientUtilTopicMetadataStore}
import org.apache.samza.config.Config
@@ -29,8 +30,6 @@ import org.apache.samza.config.JobConfig.Config2Job
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.samza.system.SystemFactory
import org.apache.samza.config.StorageConfig._
-import org.I0Itec.zkclient.ZkClient
-import kafka.utils.ZKStringSerializer
import org.apache.samza.system.SystemProducer
import org.apache.samza.system.SystemAdmin
import org.apache.samza.system.SystemConsumer
@@ -90,8 +89,8 @@ class KafkaSystemFactory extends SystemFactory with Logging {
val getProducer = () => { new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties) }
val metrics = new KafkaSystemProducerMetrics(systemName, registry)
- // Unlike consumer, no need to use encoders here, since they come for free
- // inside the producer configs. Kafka's producer will handle all of this
+ // Unlike consumer, no need to use encoders here, since they come for free
+ // inside the producer configs. Kafka's producer will handle all of this
// for us.
new KafkaSystemProducer(
@@ -111,7 +110,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
val zkConnect = Option(consumerConfig.zkConnect)
.getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
val connectZk = () => {
- new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+ ZkUtils(zkConnect, 6000, 6000, false)
}
val coordinatorStreamProperties = getCoordinatorTopicProperties(config)
val coordinatorStreamReplicationFactor = config.getCoordinatorReplicationFactor.toInt
http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index e95f052..0f0bc22 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -22,7 +22,7 @@ package org.apache.samza.util
import java.util.Properties
import java.util.concurrent.atomic.AtomicLong
import kafka.admin.AdminUtils
-import org.I0Itec.zkclient.ZkClient
+import kafka.utils.ZkUtils
import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
import org.apache.kafka.common.PartitionInfo
import org.apache.samza.config.Config
@@ -86,7 +86,7 @@ object KafkaUtil extends Logging {
}
class KafkaUtil(val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
- val connectZk: () => ZkClient) extends Logging {
+ val connectZk: () => ZkUtils) extends Logging {
/**
* Common code for creating a topic in Kafka
*
http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
index 6f498de..aaa949d 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
@@ -39,9 +39,9 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import org.apache.samza.utils.TestUtils;
import org.apache.kafka.common.MetricName;
-
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.test.TestUtils;
public class MockKafkaProducer implements Producer<byte[], byte[]> {
@@ -113,7 +113,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
}
private RecordMetadata getRecordMetadata(ProducerRecord record) {
- return new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, this.msgsSent.get());
+ return new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, this.msgsSent.get(), Record.NO_TIMESTAMP, -1, -1, -1);
}
@Override
@@ -174,6 +174,16 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
}
+ @Override
+ public void close(long timeout, TimeUnit timeUnit) {
+
+ }
+
+ public synchronized void flush () {
+
+ }
+
+
private static class FutureFailure implements Future<RecordMetadata> {
private final ExecutionException exception;
@@ -215,7 +225,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> {
public FutureSuccess(ProducerRecord record, int offset) {
this.record = record;
- this._metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset);
+ this._metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset, Record.NO_TIMESTAMP, -1, -1, -1);
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java b/samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java
deleted file mode 100644
index 2fa743f..0000000
--- a/samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.samza.utils;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-
-import static java.util.Arrays.asList;
-
-
-/**
- * Copied from :kafka-clients API as a workaround until KAFKA-1861 is resolved
- * Helper functions for writing unit tests
- */
-public class TestUtils {
-
- public static File IO_TMP_DIR = new File(System.getProperty("java.io.tmpdir"));
-
- public static String LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
- public static String DIGITS = "0123456789";
- public static String LETTERS_AND_DIGITS = LETTERS + DIGITS;
-
- /* A consistent random number generator to make tests repeatable */
- public static final Random seededRandom = new Random(192348092834L);
- public static final Random random = new Random();
-
- public static Cluster singletonCluster(String topic, int partitions) {
- return clusterWith(1, topic, partitions);
- }
-
- public static Cluster clusterWith(int nodes, String topic, int partitions) {
- Node[] ns = new Node[nodes];
- for (int i = 0; i < nodes; i++)
- ns[i] = new Node(0, "localhost", 1969);
- List<PartitionInfo> parts = new ArrayList<PartitionInfo>();
- for (int i = 0; i < partitions; i++)
- parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
- return new Cluster(asList(ns), parts);
- }
-
- /**
- * Choose a number of random available ports
- */
- public static int[] choosePorts(int count) {
- try {
- ServerSocket[] sockets = new ServerSocket[count];
- int[] ports = new int[count];
- for (int i = 0; i < count; i++) {
- sockets[i] = new ServerSocket(0);
- ports[i] = sockets[i].getLocalPort();
- }
- for (int i = 0; i < count; i++)
- sockets[i].close();
- return ports;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Choose an available port
- */
- public static int choosePort() {
- return choosePorts(1)[0];
- }
-
- /**
- * Generate an array of random bytes
- *
- * @param size The size of the array
- */
- public static byte[] randomBytes(int size) {
- byte[] bytes = new byte[size];
- seededRandom.nextBytes(bytes);
- return bytes;
- }
-
- /**
- * Generate a random string of letters and digits of the given length
- *
- * @param len The length of the string
- * @return The random string
- */
- public static String randomString(int len) {
- StringBuilder b = new StringBuilder();
- for (int i = 0; i < len; i++)
- b.append(LETTERS_AND_DIGITS.charAt(seededRandom.nextInt(LETTERS_AND_DIGITS.length())));
- return b.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index e6815da..1f2f62f 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -22,10 +22,11 @@ package org.apache.samza.checkpoint.kafka
import kafka.admin.AdminUtils
import kafka.common.{InvalidMessageSizeException, UnknownTopicOrPartitionException}
import kafka.message.InvalidMessageException
-import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{TestUtils, TestZKUtils, Utils, ZKStringSerializer}
-import kafka.zk.EmbeddedZookeeper
-import org.I0Itec.zkclient.ZkClient
+import kafka.server.{KafkaConfig, KafkaServer, ConfigType}
+import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
+import kafka.integration.KafkaServerTestHarness
+
+import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord}
import org.apache.samza.checkpoint.Checkpoint
import org.apache.samza.config.{JobConfig, KafkaProducerConfig, MapConfig}
@@ -41,71 +42,59 @@ import org.junit._
import scala.collection.JavaConversions._
import scala.collection._
-class TestKafkaCheckpointManager {
+class TestKafkaCheckpointManager extends KafkaServerTestHarness {
+
+ protected def numBrokers: Int = 3
+
+ def generateConfigs() = {
+ val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, true)
+ props.map(KafkaConfig.fromProps)
+ }
val checkpointTopic = "checkpoint-topic"
val serdeCheckpointTopic = "checkpoint-topic-invalid-serde"
val checkpointTopicConfig = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(null)
- val zkConnect: String = TestZKUtils.zookeeperConnect
- var zkClient: ZkClient = null
- val zkConnectionTimeout = 6000
- val zkSessionTimeout = 6000
-
- val brokerId1 = 0
- val brokerId2 = 1
- val brokerId3 = 2
- val ports = TestUtils.choosePorts(3)
- val (port1, port2, port3) = (ports(0), ports(1), ports(2))
-
- val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
- props1.put("controlled.shutdown.enable", "true")
- val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
- props1.put("controlled.shutdown.enable", "true")
- val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
- props1.put("controlled.shutdown.enable", "true")
-
- val config = new java.util.HashMap[String, Object]()
- val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
- config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
- config.put("acks", "all")
- config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
- config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(java.lang.Integer.MAX_VALUE-1)).toString)
- config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
- val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
+
+ val zkSecure = JaasUtils.isZkSecurityEnabled()
+
val partition = new Partition(0)
val partition2 = new Partition(1)
val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "123"))
val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "12345"))
- var zookeeper: EmbeddedZookeeper = null
- var server1: KafkaServer = null
- var server2: KafkaServer = null
- var server3: KafkaServer = null
+
+ var producerConfig: KafkaProducerConfig = null
+
var metadataStore: TopicMetadataStore = null
var failOnTopicValidation = true
val systemStreamPartitionGrouperFactoryString = classOf[GroupByPartitionFactory].getCanonicalName
@Before
- def beforeSetupServers {
- zookeeper = new EmbeddedZookeeper(zkConnect)
- server1 = TestUtils.createServer(new KafkaConfig(props1))
- server2 = TestUtils.createServer(new KafkaConfig(props2))
- server3 = TestUtils.createServer(new KafkaConfig(props3))
+ override def setUp {
+ super.setUp
+
+ TestUtils.waitUntilTrue(() => servers.head.metadataCache.getAliveBrokers.size == numBrokers, "Wait for cache to update")
+
+ val config = new java.util.HashMap[String, Object]()
+ val brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",")
+
+ config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
+ config.put("acks", "all")
+ config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
+ config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(java.lang.Integer.MAX_VALUE-1)).toString)
+ config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
+ producerConfig = new KafkaProducerConfig("kafka", "i001", config)
+
metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
}
@After
- def afterCleanLogDirs {
- server1.shutdown
- server1.awaitShutdown()
- server2.shutdown
- server2.awaitShutdown()
- server3.shutdown
- server3.awaitShutdown()
- Utils.rm(server1.config.logDirs)
- Utils.rm(server2.config.logDirs)
- Utils.rm(server3.config.logDirs)
- zookeeper.shutdown
+ override def tearDown() {
+ if (servers != null) {
+ servers.foreach(_.shutdown())
+ servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+ }
+ super.tearDown
}
private def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint, cpTopic: String = checkpointTopic) = {
@@ -127,7 +116,7 @@ class TestKafkaCheckpointManager {
private def createCheckpointTopic(cpTopic: String = checkpointTopic, partNum: Int = 1) = {
- val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+ val zkClient = ZkUtils(zkConnect, 6000, 6000, zkSecure)
try {
AdminUtils.createTopic(
zkClient,
@@ -151,8 +140,8 @@ class TestKafkaCheckpointManager {
kcm.kafkaUtil.validateTopicPartitionCount(checkpointTopic, "kafka", metadataStore, 1)
// check that log compaction is enabled.
- val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
- val topicConfig = AdminUtils.fetchTopicConfig(zkClient, checkpointTopic)
+ val zkClient = ZkUtils(zkConnect, 6000, 6000, zkSecure)
+ val topicConfig = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, checkpointTopic)
zkClient.close
assertEquals("compact", topicConfig.get("cleanup.policy"))
assertEquals("26214400", topicConfig.get("segment.bytes"))
@@ -242,7 +231,7 @@ class TestKafkaCheckpointManager {
fetchSize = 300 * 1024,
metadataStore = metadataStore,
connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties),
- connectZk = () => new ZkClient(zkConnect, 60000, 60000, ZKStringSerializer),
+ connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure),
systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
failOnCheckpointValidation = failOnTopicValidation,
checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]())))
@@ -261,7 +250,7 @@ class TestKafkaCheckpointManager {
fetchSize = 300 * 1024,
metadataStore = metadataStore,
connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties),
- connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
+ connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure),
systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
failOnCheckpointValidation = failOnTopicValidation,
serde = new InvalideSerde(exception),
http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index f00405d..0e3c9b5 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -28,10 +28,11 @@ import kafka.admin.AdminUtils
import kafka.common.{ErrorMapping, LeaderNotAvailableException}
import kafka.consumer.{Consumer, ConsumerConfig}
import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{TestUtils, TestZKUtils, Utils, ZKStringSerializer}
-import kafka.zk.EmbeddedZookeeper
-import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import kafka.utils.{TestUtils, ZkUtils}
+import kafka.integration.KafkaServerTestHarness
+import org.apache.kafka.common.security.JaasUtils
+
import org.apache.samza.Partition
import org.apache.samza.config.KafkaProducerConfig
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
@@ -42,54 +43,56 @@ import org.junit._
import scala.collection.JavaConversions._
-object TestKafkaSystemAdmin {
+
+object TestKafkaSystemAdmin extends KafkaServerTestHarness {
+
val SYSTEM = "kafka"
val TOPIC = "input"
val TOPIC2 = "input2"
val TOTAL_PARTITIONS = 50
val REPLICATION_FACTOR = 2
+ val zkSecure = JaasUtils.isZkSecurityEnabled()
+
+ protected def numBrokers: Int = 3
- val zkConnect: String = TestZKUtils.zookeeperConnect
- var zkClient: ZkClient = null
- val zkConnectionTimeout = 6000
- val zkSessionTimeout = 6000
- val brokerId1 = 0
- val brokerId2 = 1
- val brokerId3 = 2
- val ports = TestUtils.choosePorts(3)
- val (port1, port2, port3) = (ports(0), ports(1), ports(2))
-
- val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
- val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
- val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
-
- val config = new util.HashMap[String, Object]()
- val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
- config.put("bootstrap.servers", brokers)
- config.put("acks", "all")
- config.put("serializer.class", "kafka.serializer.StringEncoder")
- val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
var producer: KafkaProducer[Array[Byte], Array[Byte]] = null
- var zookeeper: EmbeddedZookeeper = null
- var server1: KafkaServer = null
- var server2: KafkaServer = null
- var server3: KafkaServer = null
var metadataStore: TopicMetadataStore = null
+ var producerConfig: KafkaProducerConfig = null
+ var brokers: String = null
+
+ def generateConfigs() = {
+ val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, true)
+ props.map(KafkaConfig.fromProps)
+ }
@BeforeClass
- def beforeSetupServers {
- zookeeper = new EmbeddedZookeeper(zkConnect)
- server1 = TestUtils.createServer(new KafkaConfig(props1))
- server2 = TestUtils.createServer(new KafkaConfig(props2))
- server3 = TestUtils.createServer(new KafkaConfig(props3))
- zkClient = new ZkClient(zkConnect + "/", 6000, 6000, ZKStringSerializer)
+ override def setUp {
+ super.setUp
+
+ val config = new java.util.HashMap[String, Object]()
+
+ brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",")
+
+ config.put("bootstrap.servers", brokers)
+ config.put("acks", "all")
+ config.put("serializer.class", "kafka.serializer.StringEncoder")
+
+ producerConfig = new KafkaProducerConfig("kafka", "i001", config)
+
producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
}
+
+ @AfterClass
+ override def tearDown {
+ super.tearDown
+ }
+
+
def createTopic(topicName: String, partitionCount: Int) {
AdminUtils.createTopic(
- zkClient,
+ zkUtils,
topicName,
partitionCount,
REPLICATION_FACTOR)
@@ -133,21 +136,6 @@ object TestKafkaSystemAdmin {
Consumer.create(consumerConfig)
}
- @AfterClass
- def afterCleanLogDirs {
- producer.close()
- server1.shutdown
- server1.awaitShutdown()
- server2.shutdown
- server2.awaitShutdown()
- server3.shutdown
- server3.awaitShutdown()
- Utils.rm(server1.config.logDirs)
- Utils.rm(server2.config.logDirs)
- Utils.rm(server3.config.logDirs)
- zkClient.close
- zookeeper.shutdown
- }
}
/**
@@ -158,7 +146,7 @@ class TestKafkaSystemAdmin {
import TestKafkaSystemAdmin._
// Provide a random zkAddress, the system admin tries to connect only when a topic is created/validated
- val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer))
+ val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure))
@Test
def testShouldAssembleMetadata {
@@ -219,7 +207,7 @@ class TestKafkaSystemAdmin {
// Empty Kafka topics should have a next offset of 0.
assertEquals("0", sspMetadata.get(new Partition(0)).getUpcomingOffset)
- // Add a new message to one of the partitions, and verify that it works as
+ // Add a new message to one of the partitions, and verify that it works as
// expected.
producer.send(new ProducerRecord(TOPIC, 48, "key1".getBytes, "val1".getBytes)).get()
metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC))
@@ -285,7 +273,8 @@ class TestKafkaSystemAdmin {
@Test
def testShouldCreateCoordinatorStream {
val topic = "test-coordinator-stream"
- val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer), coordinatorStreamReplicationFactor = 3)
+ val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamReplicationFactor = 3)
+
systemAdmin.createCoordinatorStream(topic)
validateTopic(topic, 1)
val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), "kafka", metadataStore.getTopicInfo)
@@ -296,9 +285,10 @@ class TestKafkaSystemAdmin {
assertEquals(3, partitionMetadata.replicas.size)
}
- class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin(SYSTEM, brokers, () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)) {
+ class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin(SYSTEM, brokers, () => ZkUtils(zkConnect, 6000, 6000, zkSecure)) {
import kafka.api.TopicMetadata
var metadataCallCount = 0
+
// Simulate Kafka telling us that the leader for the topic is not available
override def getTopicMetadata(topics: Set[String]) = {
metadataCallCount += 1
http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
index 3b3ed3d..8a5cbc2 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
@@ -67,7 +67,7 @@ class TestKafkaSystemConsumer {
val metrics = new KafkaSystemConsumerMetrics
// Lie and tell the store that the partition metadata is empty. We can't
// use partition metadata because it has Broker in its constructor, which
- // is package private to Kafka.
+ // is package private to Kafka.
val metadataStore = new MockMetadataStore(Map(streamName -> TopicMetadata(streamName, Seq.empty, 0)))
var hosts = List[String]()
var getHostPortCount = 0
@@ -81,7 +81,7 @@ class TestKafkaSystemConsumer {
override def createBrokerProxy(host: String, port: Int): BrokerProxy = {
new BrokerProxy(host, port, systemName, "", metrics, sink) {
override def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
- // Skip this since we normally do verification of offsets, which
+ // Skip this since we normally do verification of offsets, which
// tries to connect to Kafka. Rather than mock that, just forget it.
nextOffsets.size
}
@@ -159,13 +159,12 @@ class TestKafkaSystemConsumer {
val msg = Array[Byte](5, 112, 9, 126)
val msgAndOffset: MessageAndOffset = MessageAndOffset(new Message(msg), 887654)
- // 4 data + 14 Message overhead + 80 IncomingMessageEnvelope overhead
+ // 4 data + 18 Message overhead + 80 IncomingMessageEnvelope overhead
consumer.sink.addMessage(new TopicAndPartition("test-stream", 0), msgAndOffset, 887354)
- assertEquals(98, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0))))
+ assertEquals(106, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0))))
}
-
@Test
def testFetchThresholdBytesDisabled {
val metadataStore = new MockMetadataStore
@@ -190,4 +189,3 @@ class TestKafkaSystemConsumer {
class MockMetadataStore(var metadata: Map[String, TopicMetadata] = Map()) extends TopicMetadataStore {
def getTopicInfo(topics: Set[String]): Map[String, TopicMetadata] = metadata
}
-
http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
index e4250e7..7331611 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
@@ -27,6 +27,7 @@ import java.util
import org.junit.Assert._
import org.scalatest.Assertions.intercept
import org.apache.kafka.common.errors.{TimeoutException, RecordTooLargeException}
+import org.apache.kafka.test.MockSerializer
import org.apache.samza.SamzaException
@@ -37,13 +38,14 @@ class TestKafkaSystemProducer {
@Test
def testKafkaProducer {
+ val mockProducer = new MockProducer(true, new MockSerializer, new MockSerializer)
val systemProducer = new KafkaSystemProducer(systemName = "test",
- getProducer = () => { new MockProducer(true) },
- metrics = new KafkaSystemProducerMetrics)
+ getProducer = () => mockProducer,
+ metrics = new KafkaSystemProducerMetrics)
systemProducer.register("test")
systemProducer.start
systemProducer.send("test", someMessage)
- assertEquals(1, systemProducer.producer.asInstanceOf[MockProducer].history().size())
+ assertEquals(1, systemProducer.producer.asInstanceOf[MockProducer[Array[Byte], Array[Byte]]].history().size())
systemProducer.stop
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-test/src/main/python/configs/downloads.json
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/configs/downloads.json b/samza-test/src/main/python/configs/downloads.json
index 22694c0..c468156 100644
--- a/samza-test/src/main/python/configs/downloads.json
+++ b/samza-test/src/main/python/configs/downloads.json
@@ -1,5 +1,5 @@
{
- "url_kafka": "http://www.us.apache.org/dist/kafka/0.8.2.0/kafka_2.9.2-0.8.2.0.tgz",
- "url_zookeeper": "http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz",
+ "url_kafka": "http://archive.apache.org/dist/kafka/0.10.0.1/kafka_2.10-0.10.0.1.tgz",
+ "url_zookeeper": "http://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz",
"url_hadoop": "https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1.tar.gz"
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-test/src/main/python/configs/kafka.json
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/configs/kafka.json b/samza-test/src/main/python/configs/kafka.json
index ab2f346..ece2a34 100644
--- a/samza-test/src/main/python/configs/kafka.json
+++ b/samza-test/src/main/python/configs/kafka.json
@@ -3,21 +3,21 @@
"kafka_instance_0": "localhost"
},
"kafka_port": 9092,
- "kafka_start_cmd": "kafka_2.9.2-0.8.2.0/bin/kafka-server-start.sh -daemon kafka_2.9.2-0.8.2.0/config/server.properties",
- "kafka_stop_cmd": "kafka_2.9.2-0.8.2.0/bin/kafka-server-stop.sh",
+ "kafka_start_cmd": "kafka_2.10-0.10.0.1/bin/kafka-server-start.sh -daemon kafka_2.10-0.10.0.1/config/server.properties",
+ "kafka_stop_cmd": "kafka_2.10-0.10.0.1/bin/kafka-server-stop.sh",
"kafka_install_path": "deploy/kafka",
- "kafka_executable": "kafka_2.9.2-0.8.2.0.tgz",
+ "kafka_executable": "kafka_2.10-0.10.0.1.tgz",
"kafka_post_install_cmds": [
- "sed -i.bak 's/SIGINT/SIGTERM/g' kafka_2.9.2-0.8.2.0/bin/kafka-server-stop.sh",
- "sed -i.bak 's/^num\\.partitions *=.*/num.partitions=1/' kafka_2.9.2-0.8.2.0/config/server.properties",
- "sed -i.bak 's/.*log.dirs.*/log.dirs=data/g' kafka_2.9.2-0.8.2.0/config/server.properties"
+ "sed -i.bak 's/SIGINT/SIGTERM/g' kafka_2.10-0.10.0.1/bin/kafka-server-stop.sh",
+ "sed -i.bak 's/^num\\.partitions *=.*/num.partitions=1/' kafka_2.10-0.10.0.1/config/server.properties",
+ "sed -i.bak 's/.*log.dirs.*/log.dirs=data/g' kafka_2.10-0.10.0.1/config/server.properties"
],
"kafka_logs": [
"log-cleaner.log",
- "kafka_2.9.2-0.8.2.0/logs/controller.log",
- "kafka_2.9.2-0.8.2.0/logs/kafka-request.log",
- "kafka_2.9.2-0.8.2.0/logs/kafkaServer-gc.log",
- "kafka_2.9.2-0.8.2.0/logs/server.log",
- "kafka_2.9.2-0.8.2.0/logs/state-change.log"
+ "kafka_2.10-0.10.0.1/logs/controller.log",
+ "kafka_2.10-0.10.0.1/logs/kafka-request.log",
+ "kafka_2.10-0.10.0.1/logs/kafkaServer-gc.log",
+ "kafka_2.10-0.10.0.1/logs/server.log",
+ "kafka_2.10-0.10.0.1/logs/state-change.log"
]
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-test/src/main/python/configs/zookeeper.json
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/configs/zookeeper.json b/samza-test/src/main/python/configs/zookeeper.json
index 2762197..250bfce 100644
--- a/samza-test/src/main/python/configs/zookeeper.json
+++ b/samza-test/src/main/python/configs/zookeeper.json
@@ -2,13 +2,13 @@
"zookeeper_hosts": {
"zookeeper_instance_0": "localhost"
},
- "zookeeper_start_cmd": "zookeeper-3.4.3/bin/zkServer.sh start",
- "zookeeper_stop_cmd": "zookeeper-3.4.3/bin/zkServer.sh stop",
+ "zookeeper_start_cmd": "zookeeper-3.4.6/bin/zkServer.sh start",
+ "zookeeper_stop_cmd": "zookeeper-3.4.6/bin/zkServer.sh stop",
"zookeeper_install_path": "deploy/zookeeper",
- "zookeeper_executable": "zookeeper-3.4.3.tar.gz",
+ "zookeeper_executable": "zookeeper-3.4.6.tar.gz",
"zookeeper_post_install_cmds": [
- "cp zookeeper-3.4.3/conf/zoo_sample.cfg zookeeper-3.4.3/conf/zoo.cfg",
- "sed -i.bak 's/.*dataDir=.*/dataDir=data/g' zookeeper-3.4.3/conf/zoo.cfg"
+ "cp zookeeper-3.4.6/conf/zoo_sample.cfg zookeeper-3.4.6/conf/zoo.cfg",
+ "sed -i.bak 's/.*dataDir=.*/dataDir=data/g' zookeeper-3.4.6/conf/zoo.cfg"
],
"zookeeper_logs": [
"zookeeper.out"
http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 8d7e3fe..5d82b92 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -22,17 +22,19 @@ package org.apache.samza.test.integration
import java.util
import java.util.Properties
import java.util.concurrent.{CountDownLatch, TimeUnit}
+import javax.security.auth.login.Configuration
import kafka.admin.AdminUtils
import kafka.consumer.{Consumer, ConsumerConfig}
import kafka.message.MessageAndMetadata
import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{TestUtils, TestZKUtils, Utils, ZKStringSerializer}
+import kafka.utils.{TestUtils, CoreUtils, ZkUtils}
import kafka.zk.EmbeddedZookeeper
-import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord}
import org.apache.samza.Partition
import org.apache.samza.checkpoint.Checkpoint
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.security.JaasUtils
import org.apache.samza.config.{Config, KafkaProducerConfig, MapConfig}
import org.apache.samza.container.TaskName
import org.apache.samza.job.local.ThreadJobFactory
@@ -44,7 +46,7 @@ import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, TopicMeta
import org.junit.Assert._
import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedMap}
+import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, SynchronizedMap}
/*
* This creates an singleton instance of TestBaseStreamTask and implement the helper functions to
@@ -58,39 +60,19 @@ object StreamTaskTestUtil {
val TOTAL_TASK_NAMES = 1
val REPLICATION_FACTOR = 3
- val zkConnect: String = TestZKUtils.zookeeperConnect
- var zkClient: ZkClient = null
val zkConnectionTimeout = 6000
val zkSessionTimeout = 6000
- val brokerId1 = 0
- val brokerId2 = 1
- val brokerId3 = 2
- val ports = TestUtils.choosePorts(3)
- val (port1, port2, port3) = (ports(0), ports(1), ports(2))
-
- val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
- val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
- val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
- props1.setProperty("auto.create.topics.enable","false")
- props2.setProperty("auto.create.topics.enable","false")
- props3.setProperty("auto.create.topics.enable","false")
-
- val config = new util.HashMap[String, Object]()
- val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
- config.put("bootstrap.servers", brokers)
- config.put("request.required.acks", "-1")
- config.put("serializer.class", "kafka.serializer.StringEncoder")
- config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
- config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(Integer.MAX_VALUE-1)).toString())
- val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
+ var zkUtils: ZkUtils = null
+ var zookeeper: EmbeddedZookeeper = null
+ var brokers: String = null
+ def zkPort: Int = zookeeper.port
+ def zkConnect: String = s"127.0.0.1:$zkPort"
+
var producer: Producer[Array[Byte], Array[Byte]] = null
val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "123"))
val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "12345"))
- var zookeeper: EmbeddedZookeeper = null
- var server1: KafkaServer = null
- var server2: KafkaServer = null
- var server3: KafkaServer = null
+
var metadataStore: TopicMetadataStore = null
/*
@@ -107,8 +89,8 @@ object StreamTaskTestUtil {
"systems.kafka.samza.offset.default" -> "oldest", // applies to a nonempty topic
"systems.kafka.consumer.auto.offset.reset" -> "smallest", // applies to an empty topic
"systems.kafka.samza.msg.serde" -> "string",
- "systems.kafka.consumer.zookeeper.connect" -> zkConnect,
- "systems.kafka.producer.bootstrap.servers" -> ("localhost:%s" format port1),
+ "systems.kafka.consumer.zookeeper.connect" -> "localhost:2181",
+ "systems.kafka.producer.bootstrap.servers" -> "localhost:9092",
// Since using state, need a checkpoint manager
"task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
"task.checkpoint.system" -> "kafka",
@@ -122,12 +104,36 @@ object StreamTaskTestUtil {
TestTask.reset()
}
+ var servers: Buffer[KafkaServer] = null
+
def beforeSetupServers {
- zookeeper = new EmbeddedZookeeper(zkConnect)
- server1 = TestUtils.createServer(new KafkaConfig(props1))
- server2 = TestUtils.createServer(new KafkaConfig(props2))
- server3 = TestUtils.createServer(new KafkaConfig(props3))
- zkClient = new ZkClient(zkConnect + "/", 6000, 6000, ZKStringSerializer)
+ zookeeper = new EmbeddedZookeeper()
+ zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, JaasUtils.isZkSecurityEnabled())
+
+ val props = TestUtils.createBrokerConfigs(3, zkConnect, true)
+
+ val configs = props.map(p => {
+ p.setProperty("auto.create.topics.enable","false")
+ KafkaConfig.fromProps(p)
+ })
+
+ servers = configs.map(TestUtils.createServer(_)).toBuffer
+
+ val brokerList = TestUtils.getBrokerListStrFromServers(servers, SecurityProtocol.PLAINTEXT)
+ brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",")
+
+ jobConfig ++= Map("systems.kafka.consumer.zookeeper.connect" -> zkConnect,
+ "systems.kafka.producer.bootstrap.servers" -> brokers)
+
+ val config = new util.HashMap[String, Object]()
+
+ config.put("bootstrap.servers", brokers)
+ config.put("request.required.acks", "-1")
+ config.put("serializer.class", "kafka.serializer.StringEncoder")
+ config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
+ config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(Integer.MAX_VALUE-1)).toString())
+ val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
+
producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
@@ -137,7 +143,7 @@ object StreamTaskTestUtil {
def createTopics {
AdminUtils.createTopic(
- zkClient,
+ zkUtils,
INPUT_TOPIC,
TOTAL_TASK_NAMES,
REPLICATION_FACTOR)
@@ -174,18 +180,15 @@ object StreamTaskTestUtil {
}
def afterCleanLogDirs {
- producer.close()
- server1.shutdown
- server1.awaitShutdown()
- server2.shutdown
- server2.awaitShutdown()
- server3.shutdown
- server3.awaitShutdown()
- Utils.rm(server1.config.logDirs)
- Utils.rm(server2.config.logDirs)
- Utils.rm(server3.config.logDirs)
- zkClient.close
- zookeeper.shutdown
+ servers.foreach(_.shutdown())
+ servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+
+ if (zkUtils != null)
+ CoreUtils.swallow(zkUtils.close())
+ if (zookeeper != null)
+ CoreUtils.swallow(zookeeper.shutdown())
+ Configuration.setConfiguration(null)
+
}
}