You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/04/08 01:25:44 UTC

[12/36] samza git commit: SAMZA-600: Move Java tests into src/test/java in samza-kafka

SAMZA-600: Move Java tests into src/test/java in samza-kafka


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/660d879e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/660d879e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/660d879e

Branch: refs/heads/samza-sql
Commit: 660d879e7f1218a7289c97d654e8371fd7da6392
Parents: 1202df4
Author: Benjamin Fradet <benjamin dot fradet at gmail dot com>
Authored: Sat Mar 14 14:54:06 2015 -0700
Committer: Jakob Homan <jg...@gmail.com>
Committed: Sat Mar 14 14:54:06 2015 -0700

----------------------------------------------------------------------
 .../samza/system/kafka/MockKafkaProducer.java   | 246 +++++++++++++++++++
 .../java/org/apache/samza/utils/TestUtils.java  | 112 +++++++++
 .../samza/system/kafka/MockKafkaProducer.java   | 246 -------------------
 .../scala/org/apache/samza/utils/TestUtils.java | 112 ---------
 4 files changed, 358 insertions(+), 358 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/660d879e/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
new file mode 100644
index 0000000..6f498de
--- /dev/null
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.samza.utils.TestUtils;
+import org.apache.kafka.common.MetricName;
+
+
+public class MockKafkaProducer implements Producer<byte[], byte[]> {
+
+  private Cluster _cluster;
+  private List<FutureTask<RecordMetadata>> _callbacksList = new ArrayList<FutureTask<RecordMetadata>>();
+  private boolean shouldBuffer = false;
+  private boolean errorNext = false;
+  private Exception exception = null;
+  private AtomicInteger msgsSent = new AtomicInteger(0);
+
+  /*
+   * Helps mock out buffered behavior seen in KafkaProducer. This MockKafkaProducer enables you to:
+   *  - Create send that will instantly succeed & return a successful future
+   *  - Set error for the next message that is sent (using errorNext). In this case, the next call to send returns a
+   *    future with exception.
+   *    Please note that errorNext is reset to false once a message send has failed. This means that errorNext has to be
+   *    manually set to true in the unit test, before expecting failure for another message.
+   *  - "shouldBuffer" can be turned on to start buffering messages. This will store all the callbacks and execute it
+   *    at a later point of time in a separate thread. This thread NEEDS to be triggered from the unit test itself
+   *    using "startDelayedSendThread" method
+   *  - "Offset" in RecordMetadata is not guranteed to be correct
+   */
+  public MockKafkaProducer(int numNodes, String topicName, int numPartitions) {
+    this._cluster = TestUtils.clusterWith(numNodes, topicName, numPartitions);
+  }
+
+  public void setShouldBuffer(boolean shouldBuffer) {
+    this.shouldBuffer = shouldBuffer;
+  }
+
+  public void setErrorNext(boolean errorNext, Exception exception) {
+    this.errorNext = errorNext;
+    this.exception = exception;
+  }
+
+  public int getMsgsSent() {
+    return this.msgsSent.get();
+  }
+
+  public Thread startDelayedSendThread(final int sleepTime) {
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        FutureTask[] callbackArray = new FutureTask[_callbacksList.size()];
+        AtomicReferenceArray<FutureTask> _bufferList = new AtomicReferenceArray<FutureTask>(_callbacksList.toArray(callbackArray));
+        ExecutorService executor = Executors.newFixedThreadPool(10);
+        try {
+          for(int i = 0; i < _bufferList.length(); i++) {
+            Thread.sleep(sleepTime);
+            FutureTask f = _bufferList.get(i);
+            if(!f.isDone()) {
+              executor.submit(f).get();
+            }
+          }
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        } catch (ExecutionException ee) {
+          ee.printStackTrace();
+        }
+      }
+    });
+    t.start();
+    return t;
+  }
+
+  @Override
+  public Future<RecordMetadata> send(ProducerRecord record) {
+    return send(record, null);
+  }
+
+  private RecordMetadata getRecordMetadata(ProducerRecord record) {
+    return new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, this.msgsSent.get());
+  }
+
+  @Override
+  public Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
+    if (errorNext) {
+      if (shouldBuffer) {
+        FutureTask<RecordMetadata> f = new FutureTask<RecordMetadata>(new Callable<RecordMetadata>() {
+          @Override
+          public RecordMetadata call()
+              throws Exception {
+            callback.onCompletion(null, exception);
+            return getRecordMetadata(record);
+          }
+        });
+        _callbacksList.add(f);
+        this.errorNext = false;
+        return f;
+      } else {
+        callback.onCompletion(null, this.exception);
+        this.errorNext = false;
+        return new FutureFailure(this.exception);
+      }
+    } else {
+      if (shouldBuffer) {
+        FutureTask<RecordMetadata> f = new FutureTask<RecordMetadata>(new Callable<RecordMetadata>() {
+          @Override
+          public RecordMetadata call()
+              throws Exception {
+            msgsSent.incrementAndGet();
+            RecordMetadata metadata = getRecordMetadata(record);
+            callback.onCompletion(metadata, null);
+            return metadata;
+          }
+        });
+        _callbacksList.add(f);
+        return f;
+      } else {
+        int offset = msgsSent.incrementAndGet();
+        final RecordMetadata metadata = getRecordMetadata(record);
+        callback.onCompletion(metadata, null);
+        return new FutureSuccess(record, offset);
+      }
+    }
+  }
+
+  @Override
+  public List<PartitionInfo> partitionsFor(String topic) {
+    return this._cluster.partitionsForTopic(topic);
+  }
+
+  @Override
+  public Map<MetricName, Metric> metrics() {
+    return null;
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  private static class FutureFailure implements Future<RecordMetadata> {
+
+    private final ExecutionException exception;
+
+    public FutureFailure(Exception exception) {
+      this.exception = new ExecutionException(exception);
+    }
+
+    @Override
+    public boolean cancel(boolean interrupt) {
+      return false;
+    }
+
+    @Override
+    public RecordMetadata get() throws ExecutionException {
+      throw this.exception;
+    }
+
+    @Override
+    public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
+      throw this.exception;
+    }
+
+    @Override
+    public boolean isCancelled() {
+      return false;
+    }
+
+    @Override
+    public boolean isDone() {
+      return true;
+    }
+  }
+
+  private static class FutureSuccess implements Future<RecordMetadata> {
+
+    private ProducerRecord record;
+    private final RecordMetadata _metadata;
+
+    public FutureSuccess(ProducerRecord record, int offset) {
+      this.record = record;
+      this._metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset);
+    }
+
+    @Override
+    public boolean cancel(boolean interrupt) {
+      return false;
+    }
+
+    @Override
+    public RecordMetadata get() throws ExecutionException {
+      return this._metadata;
+    }
+
+    @Override
+    public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
+      return this._metadata;
+    }
+
+    @Override
+    public boolean isCancelled() {
+      return false;
+    }
+
+    @Override
+    public boolean isDone() {
+      return true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/660d879e/samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java b/samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java
new file mode 100644
index 0000000..2fa743f
--- /dev/null
+++ b/samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samza.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+
+import static java.util.Arrays.asList;
+
+
+/**
+ * Copied from :kafka-clients API as a workaround until KAFKA-1861 is resolved
+ * Helper functions for writing unit tests
+ */
+public class TestUtils {
+
+    public static File IO_TMP_DIR = new File(System.getProperty("java.io.tmpdir"));
+
+    public static String LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+    public static String DIGITS = "0123456789";
+    public static String LETTERS_AND_DIGITS = LETTERS + DIGITS;
+
+    /* A consistent random number generator to make tests repeatable */
+    public static final Random seededRandom = new Random(192348092834L);
+    public static final Random random = new Random();
+
+    public static Cluster singletonCluster(String topic, int partitions) {
+        return clusterWith(1, topic, partitions);
+    }
+
+    public static Cluster clusterWith(int nodes, String topic, int partitions) {
+        Node[] ns = new Node[nodes];
+        for (int i = 0; i < nodes; i++)
+            ns[i] = new Node(0, "localhost", 1969);
+        List<PartitionInfo> parts = new ArrayList<PartitionInfo>();
+        for (int i = 0; i < partitions; i++)
+            parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
+        return new Cluster(asList(ns), parts);
+    }
+
+    /**
+     * Choose a number of random available ports
+     */
+    public static int[] choosePorts(int count) {
+        try {
+            ServerSocket[] sockets = new ServerSocket[count];
+            int[] ports = new int[count];
+            for (int i = 0; i < count; i++) {
+                sockets[i] = new ServerSocket(0);
+                ports[i] = sockets[i].getLocalPort();
+            }
+            for (int i = 0; i < count; i++)
+                sockets[i].close();
+            return ports;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Choose an available port
+     */
+    public static int choosePort() {
+        return choosePorts(1)[0];
+    }
+
+    /**
+     * Generate an array of random bytes
+     * 
+     * @param size The size of the array
+     */
+    public static byte[] randomBytes(int size) {
+        byte[] bytes = new byte[size];
+        seededRandom.nextBytes(bytes);
+        return bytes;
+    }
+
+    /**
+     * Generate a random string of letters and digits of the given length
+     * 
+     * @param len The length of the string
+     * @return The random string
+     */
+    public static String randomString(int len) {
+        StringBuilder b = new StringBuilder();
+        for (int i = 0; i < len; i++)
+            b.append(LETTERS_AND_DIGITS.charAt(seededRandom.nextInt(LETTERS_AND_DIGITS.length())));
+        return b.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/660d879e/samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java
deleted file mode 100644
index 6f498de..0000000
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system.kafka;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReferenceArray;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.samza.utils.TestUtils;
-import org.apache.kafka.common.MetricName;
-
-
-public class MockKafkaProducer implements Producer<byte[], byte[]> {
-
-  private Cluster _cluster;
-  private List<FutureTask<RecordMetadata>> _callbacksList = new ArrayList<FutureTask<RecordMetadata>>();
-  private boolean shouldBuffer = false;
-  private boolean errorNext = false;
-  private Exception exception = null;
-  private AtomicInteger msgsSent = new AtomicInteger(0);
-
-  /*
-   * Helps mock out buffered behavior seen in KafkaProducer. This MockKafkaProducer enables you to:
-   *  - Create send that will instantly succeed & return a successful future
-   *  - Set error for the next message that is sent (using errorNext). In this case, the next call to send returns a
-   *    future with exception.
-   *    Please note that errorNext is reset to false once a message send has failed. This means that errorNext has to be
-   *    manually set to true in the unit test, before expecting failure for another message.
-   *  - "shouldBuffer" can be turned on to start buffering messages. This will store all the callbacks and execute it
-   *    at a later point of time in a separate thread. This thread NEEDS to be triggered from the unit test itself
-   *    using "startDelayedSendThread" method
-   *  - "Offset" in RecordMetadata is not guranteed to be correct
-   */
-  public MockKafkaProducer(int numNodes, String topicName, int numPartitions) {
-    this._cluster = TestUtils.clusterWith(numNodes, topicName, numPartitions);
-  }
-
-  public void setShouldBuffer(boolean shouldBuffer) {
-    this.shouldBuffer = shouldBuffer;
-  }
-
-  public void setErrorNext(boolean errorNext, Exception exception) {
-    this.errorNext = errorNext;
-    this.exception = exception;
-  }
-
-  public int getMsgsSent() {
-    return this.msgsSent.get();
-  }
-
-  public Thread startDelayedSendThread(final int sleepTime) {
-    Thread t = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        FutureTask[] callbackArray = new FutureTask[_callbacksList.size()];
-        AtomicReferenceArray<FutureTask> _bufferList = new AtomicReferenceArray<FutureTask>(_callbacksList.toArray(callbackArray));
-        ExecutorService executor = Executors.newFixedThreadPool(10);
-        try {
-          for(int i = 0; i < _bufferList.length(); i++) {
-            Thread.sleep(sleepTime);
-            FutureTask f = _bufferList.get(i);
-            if(!f.isDone()) {
-              executor.submit(f).get();
-            }
-          }
-        } catch (InterruptedException e) {
-          e.printStackTrace();
-        } catch (ExecutionException ee) {
-          ee.printStackTrace();
-        }
-      }
-    });
-    t.start();
-    return t;
-  }
-
-  @Override
-  public Future<RecordMetadata> send(ProducerRecord record) {
-    return send(record, null);
-  }
-
-  private RecordMetadata getRecordMetadata(ProducerRecord record) {
-    return new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, this.msgsSent.get());
-  }
-
-  @Override
-  public Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
-    if (errorNext) {
-      if (shouldBuffer) {
-        FutureTask<RecordMetadata> f = new FutureTask<RecordMetadata>(new Callable<RecordMetadata>() {
-          @Override
-          public RecordMetadata call()
-              throws Exception {
-            callback.onCompletion(null, exception);
-            return getRecordMetadata(record);
-          }
-        });
-        _callbacksList.add(f);
-        this.errorNext = false;
-        return f;
-      } else {
-        callback.onCompletion(null, this.exception);
-        this.errorNext = false;
-        return new FutureFailure(this.exception);
-      }
-    } else {
-      if (shouldBuffer) {
-        FutureTask<RecordMetadata> f = new FutureTask<RecordMetadata>(new Callable<RecordMetadata>() {
-          @Override
-          public RecordMetadata call()
-              throws Exception {
-            msgsSent.incrementAndGet();
-            RecordMetadata metadata = getRecordMetadata(record);
-            callback.onCompletion(metadata, null);
-            return metadata;
-          }
-        });
-        _callbacksList.add(f);
-        return f;
-      } else {
-        int offset = msgsSent.incrementAndGet();
-        final RecordMetadata metadata = getRecordMetadata(record);
-        callback.onCompletion(metadata, null);
-        return new FutureSuccess(record, offset);
-      }
-    }
-  }
-
-  @Override
-  public List<PartitionInfo> partitionsFor(String topic) {
-    return this._cluster.partitionsForTopic(topic);
-  }
-
-  @Override
-  public Map<MetricName, Metric> metrics() {
-    return null;
-  }
-
-  @Override
-  public void close() {
-
-  }
-
-  private static class FutureFailure implements Future<RecordMetadata> {
-
-    private final ExecutionException exception;
-
-    public FutureFailure(Exception exception) {
-      this.exception = new ExecutionException(exception);
-    }
-
-    @Override
-    public boolean cancel(boolean interrupt) {
-      return false;
-    }
-
-    @Override
-    public RecordMetadata get() throws ExecutionException {
-      throw this.exception;
-    }
-
-    @Override
-    public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
-      throw this.exception;
-    }
-
-    @Override
-    public boolean isCancelled() {
-      return false;
-    }
-
-    @Override
-    public boolean isDone() {
-      return true;
-    }
-  }
-
-  private static class FutureSuccess implements Future<RecordMetadata> {
-
-    private ProducerRecord record;
-    private final RecordMetadata _metadata;
-
-    public FutureSuccess(ProducerRecord record, int offset) {
-      this.record = record;
-      this._metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset);
-    }
-
-    @Override
-    public boolean cancel(boolean interrupt) {
-      return false;
-    }
-
-    @Override
-    public RecordMetadata get() throws ExecutionException {
-      return this._metadata;
-    }
-
-    @Override
-    public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
-      return this._metadata;
-    }
-
-    @Override
-    public boolean isCancelled() {
-      return false;
-    }
-
-    @Override
-    public boolean isDone() {
-      return true;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/660d879e/samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java b/samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java
deleted file mode 100644
index 2fa743f..0000000
--- a/samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.samza.utils;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-
-import static java.util.Arrays.asList;
-
-
-/**
- * Copied from :kafka-clients API as a workaround until KAFKA-1861 is resolved
- * Helper functions for writing unit tests
- */
-public class TestUtils {
-
-    public static File IO_TMP_DIR = new File(System.getProperty("java.io.tmpdir"));
-
-    public static String LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
-    public static String DIGITS = "0123456789";
-    public static String LETTERS_AND_DIGITS = LETTERS + DIGITS;
-
-    /* A consistent random number generator to make tests repeatable */
-    public static final Random seededRandom = new Random(192348092834L);
-    public static final Random random = new Random();
-
-    public static Cluster singletonCluster(String topic, int partitions) {
-        return clusterWith(1, topic, partitions);
-    }
-
-    public static Cluster clusterWith(int nodes, String topic, int partitions) {
-        Node[] ns = new Node[nodes];
-        for (int i = 0; i < nodes; i++)
-            ns[i] = new Node(0, "localhost", 1969);
-        List<PartitionInfo> parts = new ArrayList<PartitionInfo>();
-        for (int i = 0; i < partitions; i++)
-            parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
-        return new Cluster(asList(ns), parts);
-    }
-
-    /**
-     * Choose a number of random available ports
-     */
-    public static int[] choosePorts(int count) {
-        try {
-            ServerSocket[] sockets = new ServerSocket[count];
-            int[] ports = new int[count];
-            for (int i = 0; i < count; i++) {
-                sockets[i] = new ServerSocket(0);
-                ports[i] = sockets[i].getLocalPort();
-            }
-            for (int i = 0; i < count; i++)
-                sockets[i].close();
-            return ports;
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Choose an available port
-     */
-    public static int choosePort() {
-        return choosePorts(1)[0];
-    }
-
-    /**
-     * Generate an array of random bytes
-     * 
-     * @param size The size of the array
-     */
-    public static byte[] randomBytes(int size) {
-        byte[] bytes = new byte[size];
-        seededRandom.nextBytes(bytes);
-        return bytes;
-    }
-
-    /**
-     * Generate a random string of letters and digits of the given length
-     * 
-     * @param len The length of the string
-     * @return The random string
-     */
-    public static String randomString(int len) {
-        StringBuilder b = new StringBuilder();
-        for (int i = 0; i < len; i++)
-            b.append(LETTERS_AND_DIGITS.charAt(seededRandom.nextInt(LETTERS_AND_DIGITS.length())));
-        return b.toString();
-    }
-
-}