You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/04/08 01:25:44 UTC
[12/36] samza git commit: SAMZA-600: Move Java tests into
src/test/java in samza-kafka
SAMZA-600: Move Java tests into src/test/java in samza-kafka
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/660d879e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/660d879e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/660d879e
Branch: refs/heads/samza-sql
Commit: 660d879e7f1218a7289c97d654e8371fd7da6392
Parents: 1202df4
Author: Benjamin Fradet <benjamin dot fradet at gmail dot com>
Authored: Sat Mar 14 14:54:06 2015 -0700
Committer: Jakob Homan <jg...@gmail.com>
Committed: Sat Mar 14 14:54:06 2015 -0700
----------------------------------------------------------------------
.../samza/system/kafka/MockKafkaProducer.java | 246 +++++++++++++++++++
.../java/org/apache/samza/utils/TestUtils.java | 112 +++++++++
.../samza/system/kafka/MockKafkaProducer.java | 246 -------------------
.../scala/org/apache/samza/utils/TestUtils.java | 112 ---------
4 files changed, 358 insertions(+), 358 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/660d879e/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
new file mode 100644
index 0000000..6f498de
--- /dev/null
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.samza.utils.TestUtils;
+import org.apache.kafka.common.MetricName;
+
+
+public class MockKafkaProducer implements Producer<byte[], byte[]> {
+
+ private Cluster _cluster;
+ private List<FutureTask<RecordMetadata>> _callbacksList = new ArrayList<FutureTask<RecordMetadata>>();
+ private boolean shouldBuffer = false;
+ private boolean errorNext = false;
+ private Exception exception = null;
+ private AtomicInteger msgsSent = new AtomicInteger(0);
+
+ /*
+ * Helps mock out buffered behavior seen in KafkaProducer. This MockKafkaProducer enables you to:
+ * - Create send that will instantly succeed & return a successful future
+ * - Set error for the next message that is sent (using errorNext). In this case, the next call to send returns a
+ * future with exception.
+ * Please note that errorNext is reset to false once a message send has failed. This means that errorNext has to be
+ * manually set to true in the unit test, before expecting failure for another message.
+ * - "shouldBuffer" can be turned on to start buffering messages. This will store all the callbacks and execute it
+ * at a later point of time in a separate thread. This thread NEEDS to be triggered from the unit test itself
+ * using "startDelayedSendThread" method
+ * - "Offset" in RecordMetadata is not guranteed to be correct
+ */
+ public MockKafkaProducer(int numNodes, String topicName, int numPartitions) {
+ this._cluster = TestUtils.clusterWith(numNodes, topicName, numPartitions);
+ }
+
+ public void setShouldBuffer(boolean shouldBuffer) {
+ this.shouldBuffer = shouldBuffer;
+ }
+
+ public void setErrorNext(boolean errorNext, Exception exception) {
+ this.errorNext = errorNext;
+ this.exception = exception;
+ }
+
+ public int getMsgsSent() {
+ return this.msgsSent.get();
+ }
+
+ public Thread startDelayedSendThread(final int sleepTime) {
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ FutureTask[] callbackArray = new FutureTask[_callbacksList.size()];
+ AtomicReferenceArray<FutureTask> _bufferList = new AtomicReferenceArray<FutureTask>(_callbacksList.toArray(callbackArray));
+ ExecutorService executor = Executors.newFixedThreadPool(10);
+ try {
+ for(int i = 0; i < _bufferList.length(); i++) {
+ Thread.sleep(sleepTime);
+ FutureTask f = _bufferList.get(i);
+ if(!f.isDone()) {
+ executor.submit(f).get();
+ }
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ExecutionException ee) {
+ ee.printStackTrace();
+ }
+ }
+ });
+ t.start();
+ return t;
+ }
+
+ @Override
+ public Future<RecordMetadata> send(ProducerRecord record) {
+ return send(record, null);
+ }
+
+ private RecordMetadata getRecordMetadata(ProducerRecord record) {
+ return new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, this.msgsSent.get());
+ }
+
+ @Override
+ public Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
+ if (errorNext) {
+ if (shouldBuffer) {
+ FutureTask<RecordMetadata> f = new FutureTask<RecordMetadata>(new Callable<RecordMetadata>() {
+ @Override
+ public RecordMetadata call()
+ throws Exception {
+ callback.onCompletion(null, exception);
+ return getRecordMetadata(record);
+ }
+ });
+ _callbacksList.add(f);
+ this.errorNext = false;
+ return f;
+ } else {
+ callback.onCompletion(null, this.exception);
+ this.errorNext = false;
+ return new FutureFailure(this.exception);
+ }
+ } else {
+ if (shouldBuffer) {
+ FutureTask<RecordMetadata> f = new FutureTask<RecordMetadata>(new Callable<RecordMetadata>() {
+ @Override
+ public RecordMetadata call()
+ throws Exception {
+ msgsSent.incrementAndGet();
+ RecordMetadata metadata = getRecordMetadata(record);
+ callback.onCompletion(metadata, null);
+ return metadata;
+ }
+ });
+ _callbacksList.add(f);
+ return f;
+ } else {
+ int offset = msgsSent.incrementAndGet();
+ final RecordMetadata metadata = getRecordMetadata(record);
+ callback.onCompletion(metadata, null);
+ return new FutureSuccess(record, offset);
+ }
+ }
+ }
+
+ @Override
+ public List<PartitionInfo> partitionsFor(String topic) {
+ return this._cluster.partitionsForTopic(topic);
+ }
+
+ @Override
+ public Map<MetricName, Metric> metrics() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ private static class FutureFailure implements Future<RecordMetadata> {
+
+ private final ExecutionException exception;
+
+ public FutureFailure(Exception exception) {
+ this.exception = new ExecutionException(exception);
+ }
+
+ @Override
+ public boolean cancel(boolean interrupt) {
+ return false;
+ }
+
+ @Override
+ public RecordMetadata get() throws ExecutionException {
+ throw this.exception;
+ }
+
+ @Override
+ public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
+ throw this.exception;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return true;
+ }
+ }
+
+ private static class FutureSuccess implements Future<RecordMetadata> {
+
+ private ProducerRecord record;
+ private final RecordMetadata _metadata;
+
+ public FutureSuccess(ProducerRecord record, int offset) {
+ this.record = record;
+ this._metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset);
+ }
+
+ @Override
+ public boolean cancel(boolean interrupt) {
+ return false;
+ }
+
+ @Override
+ public RecordMetadata get() throws ExecutionException {
+ return this._metadata;
+ }
+
+ @Override
+ public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
+ return this._metadata;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/660d879e/samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java b/samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java
new file mode 100644
index 0000000..2fa743f
--- /dev/null
+++ b/samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samza.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+
+import static java.util.Arrays.asList;
+
+
+/**
+ * Copied from :kafka-clients API as a workaround until KAFKA-1861 is resolved
+ * Helper functions for writing unit tests
+ */
+public class TestUtils {
+
+ public static File IO_TMP_DIR = new File(System.getProperty("java.io.tmpdir"));
+
+ public static String LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+ public static String DIGITS = "0123456789";
+ public static String LETTERS_AND_DIGITS = LETTERS + DIGITS;
+
+ /* A consistent random number generator to make tests repeatable */
+ public static final Random seededRandom = new Random(192348092834L);
+ public static final Random random = new Random();
+
+ public static Cluster singletonCluster(String topic, int partitions) {
+ return clusterWith(1, topic, partitions);
+ }
+
+ public static Cluster clusterWith(int nodes, String topic, int partitions) {
+ Node[] ns = new Node[nodes];
+ for (int i = 0; i < nodes; i++)
+ ns[i] = new Node(0, "localhost", 1969);
+ List<PartitionInfo> parts = new ArrayList<PartitionInfo>();
+ for (int i = 0; i < partitions; i++)
+ parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
+ return new Cluster(asList(ns), parts);
+ }
+
+ /**
+ * Choose a number of random available ports
+ */
+ public static int[] choosePorts(int count) {
+ try {
+ ServerSocket[] sockets = new ServerSocket[count];
+ int[] ports = new int[count];
+ for (int i = 0; i < count; i++) {
+ sockets[i] = new ServerSocket(0);
+ ports[i] = sockets[i].getLocalPort();
+ }
+ for (int i = 0; i < count; i++)
+ sockets[i].close();
+ return ports;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Choose an available port
+ */
+ public static int choosePort() {
+ return choosePorts(1)[0];
+ }
+
+ /**
+ * Generate an array of random bytes
+ *
+ * @param size The size of the array
+ */
+ public static byte[] randomBytes(int size) {
+ byte[] bytes = new byte[size];
+ seededRandom.nextBytes(bytes);
+ return bytes;
+ }
+
+ /**
+ * Generate a random string of letters and digits of the given length
+ *
+ * @param len The length of the string
+ * @return The random string
+ */
+ public static String randomString(int len) {
+ StringBuilder b = new StringBuilder();
+ for (int i = 0; i < len; i++)
+ b.append(LETTERS_AND_DIGITS.charAt(seededRandom.nextInt(LETTERS_AND_DIGITS.length())));
+ return b.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/660d879e/samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java
deleted file mode 100644
index 6f498de..0000000
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system.kafka;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReferenceArray;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.samza.utils.TestUtils;
-import org.apache.kafka.common.MetricName;
-
-
-public class MockKafkaProducer implements Producer<byte[], byte[]> {
-
- private Cluster _cluster;
- private List<FutureTask<RecordMetadata>> _callbacksList = new ArrayList<FutureTask<RecordMetadata>>();
- private boolean shouldBuffer = false;
- private boolean errorNext = false;
- private Exception exception = null;
- private AtomicInteger msgsSent = new AtomicInteger(0);
-
- /*
- * Helps mock out buffered behavior seen in KafkaProducer. This MockKafkaProducer enables you to:
- * - Create send that will instantly succeed & return a successful future
- * - Set error for the next message that is sent (using errorNext). In this case, the next call to send returns a
- * future with exception.
- * Please note that errorNext is reset to false once a message send has failed. This means that errorNext has to be
- * manually set to true in the unit test, before expecting failure for another message.
- * - "shouldBuffer" can be turned on to start buffering messages. This will store all the callbacks and execute it
- * at a later point of time in a separate thread. This thread NEEDS to be triggered from the unit test itself
- * using "startDelayedSendThread" method
- * - "Offset" in RecordMetadata is not guranteed to be correct
- */
- public MockKafkaProducer(int numNodes, String topicName, int numPartitions) {
- this._cluster = TestUtils.clusterWith(numNodes, topicName, numPartitions);
- }
-
- public void setShouldBuffer(boolean shouldBuffer) {
- this.shouldBuffer = shouldBuffer;
- }
-
- public void setErrorNext(boolean errorNext, Exception exception) {
- this.errorNext = errorNext;
- this.exception = exception;
- }
-
- public int getMsgsSent() {
- return this.msgsSent.get();
- }
-
- public Thread startDelayedSendThread(final int sleepTime) {
- Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- FutureTask[] callbackArray = new FutureTask[_callbacksList.size()];
- AtomicReferenceArray<FutureTask> _bufferList = new AtomicReferenceArray<FutureTask>(_callbacksList.toArray(callbackArray));
- ExecutorService executor = Executors.newFixedThreadPool(10);
- try {
- for(int i = 0; i < _bufferList.length(); i++) {
- Thread.sleep(sleepTime);
- FutureTask f = _bufferList.get(i);
- if(!f.isDone()) {
- executor.submit(f).get();
- }
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException ee) {
- ee.printStackTrace();
- }
- }
- });
- t.start();
- return t;
- }
-
- @Override
- public Future<RecordMetadata> send(ProducerRecord record) {
- return send(record, null);
- }
-
- private RecordMetadata getRecordMetadata(ProducerRecord record) {
- return new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, this.msgsSent.get());
- }
-
- @Override
- public Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
- if (errorNext) {
- if (shouldBuffer) {
- FutureTask<RecordMetadata> f = new FutureTask<RecordMetadata>(new Callable<RecordMetadata>() {
- @Override
- public RecordMetadata call()
- throws Exception {
- callback.onCompletion(null, exception);
- return getRecordMetadata(record);
- }
- });
- _callbacksList.add(f);
- this.errorNext = false;
- return f;
- } else {
- callback.onCompletion(null, this.exception);
- this.errorNext = false;
- return new FutureFailure(this.exception);
- }
- } else {
- if (shouldBuffer) {
- FutureTask<RecordMetadata> f = new FutureTask<RecordMetadata>(new Callable<RecordMetadata>() {
- @Override
- public RecordMetadata call()
- throws Exception {
- msgsSent.incrementAndGet();
- RecordMetadata metadata = getRecordMetadata(record);
- callback.onCompletion(metadata, null);
- return metadata;
- }
- });
- _callbacksList.add(f);
- return f;
- } else {
- int offset = msgsSent.incrementAndGet();
- final RecordMetadata metadata = getRecordMetadata(record);
- callback.onCompletion(metadata, null);
- return new FutureSuccess(record, offset);
- }
- }
- }
-
- @Override
- public List<PartitionInfo> partitionsFor(String topic) {
- return this._cluster.partitionsForTopic(topic);
- }
-
- @Override
- public Map<MetricName, Metric> metrics() {
- return null;
- }
-
- @Override
- public void close() {
-
- }
-
- private static class FutureFailure implements Future<RecordMetadata> {
-
- private final ExecutionException exception;
-
- public FutureFailure(Exception exception) {
- this.exception = new ExecutionException(exception);
- }
-
- @Override
- public boolean cancel(boolean interrupt) {
- return false;
- }
-
- @Override
- public RecordMetadata get() throws ExecutionException {
- throw this.exception;
- }
-
- @Override
- public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
- throw this.exception;
- }
-
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public boolean isDone() {
- return true;
- }
- }
-
- private static class FutureSuccess implements Future<RecordMetadata> {
-
- private ProducerRecord record;
- private final RecordMetadata _metadata;
-
- public FutureSuccess(ProducerRecord record, int offset) {
- this.record = record;
- this._metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset);
- }
-
- @Override
- public boolean cancel(boolean interrupt) {
- return false;
- }
-
- @Override
- public RecordMetadata get() throws ExecutionException {
- return this._metadata;
- }
-
- @Override
- public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
- return this._metadata;
- }
-
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public boolean isDone() {
- return true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/660d879e/samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java b/samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java
deleted file mode 100644
index 2fa743f..0000000
--- a/samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.samza.utils;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-
-import static java.util.Arrays.asList;
-
-
-/**
- * Copied from :kafka-clients API as a workaround until KAFKA-1861 is resolved
- * Helper functions for writing unit tests
- */
-public class TestUtils {
-
- public static File IO_TMP_DIR = new File(System.getProperty("java.io.tmpdir"));
-
- public static String LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
- public static String DIGITS = "0123456789";
- public static String LETTERS_AND_DIGITS = LETTERS + DIGITS;
-
- /* A consistent random number generator to make tests repeatable */
- public static final Random seededRandom = new Random(192348092834L);
- public static final Random random = new Random();
-
- public static Cluster singletonCluster(String topic, int partitions) {
- return clusterWith(1, topic, partitions);
- }
-
- public static Cluster clusterWith(int nodes, String topic, int partitions) {
- Node[] ns = new Node[nodes];
- for (int i = 0; i < nodes; i++)
- ns[i] = new Node(0, "localhost", 1969);
- List<PartitionInfo> parts = new ArrayList<PartitionInfo>();
- for (int i = 0; i < partitions; i++)
- parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
- return new Cluster(asList(ns), parts);
- }
-
- /**
- * Choose a number of random available ports
- */
- public static int[] choosePorts(int count) {
- try {
- ServerSocket[] sockets = new ServerSocket[count];
- int[] ports = new int[count];
- for (int i = 0; i < count; i++) {
- sockets[i] = new ServerSocket(0);
- ports[i] = sockets[i].getLocalPort();
- }
- for (int i = 0; i < count; i++)
- sockets[i].close();
- return ports;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Choose an available port
- */
- public static int choosePort() {
- return choosePorts(1)[0];
- }
-
- /**
- * Generate an array of random bytes
- *
- * @param size The size of the array
- */
- public static byte[] randomBytes(int size) {
- byte[] bytes = new byte[size];
- seededRandom.nextBytes(bytes);
- return bytes;
- }
-
- /**
- * Generate a random string of letters and digits of the given length
- *
- * @param len The length of the string
- * @return The random string
- */
- public static String randomString(int len) {
- StringBuilder b = new StringBuilder();
- for (int i = 0; i < len; i++)
- b.append(LETTERS_AND_DIGITS.charAt(seededRandom.nextInt(LETTERS_AND_DIGITS.length())));
- return b.toString();
- }
-
-}