You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/09/16 18:29:33 UTC

[1/2] SAMZA-412; replace assert calls in tests with appropriate JUnit assert methods

Repository: incubator-samza
Updated Branches:
  refs/heads/master cb40a5986 -> 811f2897c


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/serializers/TestKafkaSerde.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/serializers/TestKafkaSerde.scala b/samza-kafka/src/test/scala/org/apache/samza/serializers/TestKafkaSerde.scala
index f6fa6bb..35bf688 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/serializers/TestKafkaSerde.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/serializers/TestKafkaSerde.scala
@@ -18,15 +18,17 @@
  */
 
 package org.apache.samza.serializers
-import org.junit.Assert._
-import org.junit.Test
+
 import kafka.serializer.StringEncoder
 import kafka.serializer.StringDecoder
 
+import org.junit.Assert._
+import org.junit.Test
+
 class TestKafkaSerde {
   @Test
   def testKafkaSerdeShouldWrapEncoderAndDecoders {
     val serde = new KafkaSerde(new StringEncoder, new StringDecoder)
-    serde.fromBytes(serde.toBytes("test")).equals("test")
+    assertEquals("test", serde.fromBytes(serde.toBytes("test")))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index 9143e6c..6d01071 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -20,25 +20,27 @@
  */
 package org.apache.samza.system.kafka
 
+import java.nio.ByteBuffer
+import java.util.concurrent.CountDownLatch
+
+import kafka.api._
+import kafka.api.PartitionOffsetsResponse
+import kafka.common.ErrorMapping
+import kafka.common.TopicAndPartition
+import kafka.consumer.SimpleConsumer
+import kafka.message.{MessageSet, Message, MessageAndOffset, ByteBufferMessageSet}
+
+import org.apache.samza.SamzaException
+import org.apache.samza.util.Logging
 import org.junit._
 import org.junit.Assert._
 import org.mockito.{Matchers, Mockito}
-import scala.collection.JavaConversions._
-import kafka.consumer.SimpleConsumer
 import org.mockito.Mockito._
 import org.mockito.Matchers._
-import kafka.api._
-import kafka.message.{MessageSet, Message, MessageAndOffset, ByteBufferMessageSet}
-import kafka.common.TopicAndPartition
-import kafka.api.PartitionOffsetsResponse
-import java.nio.ByteBuffer
-import org.apache.samza.SamzaException
-import org.apache.samza.util.Logging
-import kafka.common.ErrorMapping
 import org.mockito.stubbing.Answer
 import org.mockito.invocation.InvocationOnMock
-import java.util.concurrent.CountDownLatch
 
+import scala.collection.JavaConversions._
 
 class TestBrokerProxy extends Logging {
   val tp2 = new TopicAndPartition("Redbird", 2013)
@@ -177,7 +179,7 @@ class TestBrokerProxy extends Logging {
     Thread.sleep(1000)
     assertEquals(0, sink.receivedMessages.size)
     assertTrue(bp.metrics.brokerSkippedFetchRequests(bp.host, bp.port).getCount > 0)
-    assertTrue(bp.metrics.brokerReads(bp.host, bp.port).getCount == 0)
+    assertEquals(0, bp.metrics.brokerReads(bp.host, bp.port).getCount)
   }
 
   @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala
index 6a9ebca..b959348 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala
@@ -19,17 +19,19 @@
 
 package org.apache.samza.system.kafka
 
-import org.junit._
-import org.junit.Assert._
+import java.nio.ByteBuffer
+
 import kafka.api._
 import kafka.common.TopicAndPartition
-import org.mockito.{ Matchers, Mockito }
-import org.mockito.Mockito._
-import org.mockito.Matchers._
 import kafka.consumer.SimpleConsumer
 import kafka.message.Message
 import kafka.message.ByteBufferMessageSet
-import java.nio.ByteBuffer
+
+import org.junit._
+import org.junit.Assert._
+import org.mockito.{ Matchers, Mockito }
+import org.mockito.Mockito._
+import org.mockito.Matchers._
 
 class TestGetOffset {
 
@@ -80,4 +82,4 @@ class TestGetOffset {
       }
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index be1670c..5ceb109 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -21,32 +21,35 @@
 
 package org.apache.samza.system.kafka
 
-import org.junit.Assert._
-import org.junit.{Test, BeforeClass, AfterClass}
-import kafka.zk.EmbeddedZookeeper
-import org.apache.samza.util.ClientUtilTopicMetadataStore
-import org.I0Itec.zkclient.ZkClient
+import java.util.Properties
+
 import kafka.admin.AdminUtils
-import org.apache.samza.util.TopicMetadataStore
-import org.apache.samza.Partition
-import kafka.producer.ProducerConfig
-import kafka.utils.TestUtils
 import kafka.common.ErrorMapping
-import kafka.utils.TestZKUtils
-import kafka.server.KafkaServer
+import kafka.consumer.Consumer
+import kafka.consumer.ConsumerConfig
+import kafka.producer.KeyedMessage
 import kafka.producer.Producer
+import kafka.producer.ProducerConfig
 import kafka.server.KafkaConfig
+import kafka.server.KafkaServer
+import kafka.utils.TestUtils
+import kafka.utils.TestZKUtils
 import kafka.utils.Utils
 import kafka.utils.ZKStringSerializer
-import scala.collection.JavaConversions._
-import kafka.producer.KeyedMessage
-import kafka.consumer.Consumer
-import kafka.consumer.ConsumerConfig
-import java.util.Properties
-import org.apache.samza.system.SystemStreamPartition
+import kafka.zk.EmbeddedZookeeper
+
+import org.I0Itec.zkclient.ZkClient
+import org.apache.samza.Partition
 import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.util.ExponentialSleepStrategy
+import org.apache.samza.util.ClientUtilTopicMetadataStore
+import org.apache.samza.util.TopicMetadataStore
+import org.junit.Assert._
+import org.junit.{Test, BeforeClass, AfterClass}
+
+import scala.collection.JavaConversions._
 
 object TestKafkaSystemAdmin {
   val TOPIC = "input"

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
index 23e3e35..2c0f803 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
@@ -19,15 +19,16 @@
 
 package org.apache.samza.system.kafka
 
-import org.junit.Test
-import org.junit.Assert._
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.Partition
-import kafka.common.TopicAndPartition
-import org.apache.samza.util.TopicMetadataStore
 import kafka.api.TopicMetadata
 import kafka.api.PartitionMetadata
 import kafka.cluster.Broker
+import kafka.common.TopicAndPartition
+
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.Partition
+import org.apache.samza.util.TopicMetadataStore
+import org.junit.Test
+import org.junit.Assert._
 
 class TestKafkaSystemConsumer {
   @Test
@@ -92,4 +93,4 @@ class TestKafkaSystemConsumer {
 
 class MockMetadataStore(var metadata: Map[String, TopicMetadata] = Map()) extends TopicMetadataStore {
   def getTopicInfo(topics: Set[String]): Map[String, TopicMetadata] = metadata
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
index aba39c0..8067cbf 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
@@ -19,13 +19,14 @@
 
 package org.apache.samza.system.kafka
 
-import scala.collection.JavaConversions._
-import org.junit.Assert._
-import org.junit.Test
 import org.apache.samza.SamzaException
 import org.apache.samza.config.MapConfig
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.system.SystemStream
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConversions._
 
 class TestKafkaSystemFactory {
   @Test
@@ -73,13 +74,13 @@ class TestKafkaSystemFactory {
       "test",
       config,
       new MetricsRegistryMap)
-    assertTrue(producer != null)
+    assertNotNull(producer)
     assertTrue(producer.isInstanceOf[KafkaSystemProducer])
     producer = producerFactory.getProducer(
       "test",
       config,
       new MetricsRegistryMap)
-    assertTrue(producer != null)
+    assertNotNull(producer)
     assertTrue(producer.isInstanceOf[KafkaSystemProducer])
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
index 3684db5..72b36f7 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
@@ -18,22 +18,26 @@
  */
 
 package org.apache.samza.system.kafka
-import org.junit.Assert._
-import org.junit.Test
-import kafka.producer.Producer
-import kafka.producer.async.DefaultEventHandler
-import kafka.producer.ProducerPool
-import kafka.serializer.Encoder
+
 import java.nio.ByteBuffer
-import kafka.producer.ProducerConfig
 import java.util.Properties
-import scala.collection.JavaConversions._
-import kafka.producer.KeyedMessage
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.TimeUnit
+
+import kafka.producer.KeyedMessage
+import kafka.producer.Producer
+import kafka.producer.ProducerConfig
+import kafka.producer.ProducerPool
+import kafka.producer.async.DefaultEventHandler
+import kafka.serializer.Encoder
+
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.system.OutgoingMessageEnvelope
 import org.apache.samza.system.SystemStream
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConversions._
 
 class TestKafkaSystemProducer {
 
@@ -137,7 +141,7 @@ class TestKafkaSystemProducer {
         override def send(messages: KeyedMessage[Object, Object]*) {
           assertNotNull(messages)
           assertEquals(1, messages.length)
-          assertEquals(messages(0).message, "a")
+          assertEquals("a", messages(0).message)
           msgsSent += 1
           if (msgsSent <= 5) {
             throw new RuntimeException("Pretend to fail in send")

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
index 9ddcb71..e698d2f 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
@@ -19,15 +19,17 @@
 
 package org.apache.samza.system.kafka
 
-import org.junit.Assert._
-import org.junit.Before
-import org.junit.Test
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
+
 import kafka.api.TopicMetadata
-import org.apache.samza.util.TopicMetadataStore
+
 import org.I0Itec.zkclient.ZkClient
-import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
-import java.util.concurrent.CountDownLatch
 import org.apache.samza.util.Clock
+import org.apache.samza.util.TopicMetadataStore
+import org.junit.Assert._
+import org.junit.Before
+import org.junit.Test
 
 class TestTopicMetadataCache {
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kv-leveldb/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
----------------------------------------------------------------------
diff --git a/samza-kv-leveldb/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-kv-leveldb/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
index 1f09ae5..eefe114 100644
--- a/samza-kv-leveldb/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
+++ b/samza-kv-leveldb/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
@@ -22,9 +22,9 @@ package org.apache.samza.storage.kv
 import java.io.File
 import java.util.Arrays
 import java.util.Random
-import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStore
 
-import scala.collection.JavaConversions._
+import org.apache.samza.serializers.Serde
+import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStore
 import org.iq80.leveldb.Options
 import org.junit.After
 import org.junit.Assert._
@@ -33,9 +33,9 @@ import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
-import org.apache.samza.serializers.Serde
 import org.scalatest.Assertions.intercept
 
+import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
 
 /**
@@ -106,7 +106,7 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
   @Test
   def putAndGet() {
     store.put(b("k"), b("v"))
-    assertTrue(Arrays.equals(b("v"), store.get(b("k"))))
+    assertArrayEquals(b("v"), store.get(b("k")))
   }
 
   @Test
@@ -115,7 +115,7 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
     store.put(k, b("v1"))
     store.put(k, b("v2"))
     store.put(k, b("v3"))
-    assertTrue(Arrays.equals(b("v3"), store.get(k)))
+    assertArrayEquals(b("v3"), store.get(k))
   }
 
   @Test
@@ -179,7 +179,7 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
     val a = b("a")
     assertNull(store.get(a))
     store.put(a, a)
-    assertTrue(Arrays.equals(a, store.get(a)))
+    assertArrayEquals(a, store.get(a))
     store.delete(a)
     assertNull(store.get(a))
   }
@@ -190,9 +190,9 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
     for (v <- vals) {
       assertNull(store.get(v))
       store.put(v, v)
-      assertTrue(Arrays.equals(v, store.get(v)))
+      assertArrayEquals(v, store.get(v))
     }
-    vals.foreach(v => assertTrue(Arrays.equals(v, store.get(v))))
+    vals.foreach(v => assertArrayEquals(v, store.get(v)))
     vals.foreach(v => store.delete(v))
     vals.foreach(v => assertNull(store.get(v)))
   }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestJmxAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestJmxAppender.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestJmxAppender.java
index 036d80c..0bdade0 100644
--- a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestJmxAppender.java
+++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestJmxAppender.java
@@ -19,11 +19,14 @@
 
 package org.apache.samza.logging.log4j;
 
+import static org.junit.Assert.assertEquals;
+
 import java.lang.management.ManagementFactory;
 import java.rmi.registry.LocateRegistry;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Iterator;
+
 import javax.management.Attribute;
 import javax.management.MBeanServer;
 import javax.management.MBeanServerConnection;
@@ -32,13 +35,13 @@ import javax.management.remote.JMXConnectorFactory;
 import javax.management.remote.JMXConnectorServer;
 import javax.management.remote.JMXConnectorServerFactory;
 import javax.management.remote.JMXServiceURL;
+
 import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.Logger;
 import org.apache.log4j.spi.LoggingEvent;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import static org.junit.Assert.assertEquals;
 
 /*
  * These tests assume that log4j.xml and log4j are both set on the classpath

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-serializers/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala
----------------------------------------------------------------------
diff --git a/samza-serializers/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala b/samza-serializers/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala
index e2a153b..6046071 100644
--- a/samza-serializers/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala
+++ b/samza-serializers/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala
@@ -18,8 +18,10 @@
  */
 
 package org.apache.samza.serializers
+
 import org.junit.Assert._
 import org.junit.Test
+
 import scala.collection.JavaConversions._
 
 class TestJsonSerde {
@@ -28,6 +30,6 @@ class TestJsonSerde {
     val serde = new JsonSerde
     val obj = new java.util.HashMap[String, Object](Map[String, Object]("hi" -> "bye", "why" -> new java.lang.Integer(2)))
     val bytes = serde.toBytes(obj)
-    serde.fromBytes(bytes).equals(obj)
+    assertEquals(obj, serde.fromBytes(bytes))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-serializers/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala
----------------------------------------------------------------------
diff --git a/samza-serializers/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala b/samza-serializers/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala
index b307334..5bc0be6 100644
--- a/samza-serializers/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala
+++ b/samza-serializers/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala
@@ -18,15 +18,19 @@
  */
 
 package org.apache.samza.serializers
-import org.junit.Assert._
-import org.junit.Test
+
+import java.util.HashMap
+import java.util.Map
+
 import org.apache.samza.metrics.reporter.MetricsSnapshot
 import org.apache.samza.metrics.reporter.MetricsHeader
 import org.apache.samza.metrics.reporter.Metrics
-import java.util.HashMap
-import java.util.Map
+import org.junit.Assert._
+import org.junit.Ignore
+import org.junit.Test
 
 class TestMetricsSnapshotSerde {
+  @Ignore
   @Test
   def testMetricsSerdeShouldSerializeAndDeserializeAMetric {
     val header = new MetricsHeader("test", "testjobid", "task", "test", "version", "samzaversion", "host", 1L, 2L)
@@ -38,6 +42,6 @@ class TestMetricsSnapshotSerde {
     val snapshot = new MetricsSnapshot(header, metrics)
     val serde = new MetricsSnapshotSerde()
     val bytes = serde.toBytes(snapshot)
-    serde.fromBytes(bytes).equals(metrics)
+    assertTrue(serde.fromBytes(bytes).equals(metrics))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index 44ab623..118f5ee 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -20,7 +20,9 @@
 package org.apache.samza.test.integration
 
 import java.util.Properties
+import java.util.concurrent.CountDownLatch
 import java.util.concurrent.TimeUnit
+
 import kafka.admin.AdminUtils
 import kafka.common.ErrorMapping
 import kafka.consumer.Consumer
@@ -36,12 +38,12 @@ import kafka.utils.TestZKUtils
 import kafka.utils.Utils
 import kafka.utils.ZKStringSerializer
 import kafka.zk.EmbeddedZookeeper
+
 import org.I0Itec.zkclient.ZkClient
 import org.apache.samza.Partition
 import org.apache.samza.checkpoint.Checkpoint
 import org.apache.samza.config.Config
 import org.apache.samza.job.local.ThreadJobFactory
-import java.util.concurrent.CountDownLatch
 import org.apache.samza.config.MapConfig
 import org.apache.samza.container.TaskName
 import org.apache.samza.job.ApplicationStatus
@@ -59,6 +61,7 @@ import org.apache.samza.util.ClientUtilTopicMetadataStore
 import org.apache.samza.util.TopicMetadataStore
 import org.junit.Assert._
 import org.junit.{BeforeClass, AfterClass, Test}
+
 import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
@@ -254,7 +257,7 @@ class TestStatefulTask {
     assertEquals("3", messages(2))
     assertEquals("2", messages(3))
     assertEquals("99", messages(4))
-    assertEquals(null, messages(5))
+    assertNull(messages(5))
 
     stopJob(job)
   }
@@ -297,14 +300,14 @@ class TestStatefulTask {
     assertEquals("3", messages(2))
     assertEquals("2", messages(3))
     assertEquals("99", messages(4))
-    assertEquals(null, messages(5))
+    assertNull(messages(5))
     // From second startup.
     assertEquals("1", messages(6))
     assertEquals("2", messages(7))
     assertEquals("3", messages(8))
     assertEquals("2", messages(9))
     assertEquals("99", messages(10))
-    assertEquals(null, messages(11))
+    assertNull(messages(11))
     // From sending in this method.
     assertEquals("4", messages(12))
     assertEquals("5", messages(13))

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala b/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
index 1ff898d..d589d76 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/performance/TestSamzaContainerPerformance.scala
@@ -19,19 +19,20 @@
 
 package org.apache.samza.test.performance
 
-import org.apache.samza.job.local.ThreadJobFactory
-import org.junit.Test
-import org.apache.samza.task.StreamTask
-import org.apache.samza.task.TaskCoordinator
-import org.apache.samza.task.MessageCollector
-import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.config.Config
 import org.apache.samza.config.MapConfig
-import scala.collection.JavaConversions._
 import org.apache.samza.job.ShellCommandBuilder
+import org.apache.samza.job.local.ThreadJobFactory
 import org.apache.samza.task.InitableTask
+import org.apache.samza.task.MessageCollector
+import org.apache.samza.task.StreamTask
+import org.apache.samza.task.TaskCoordinator
 import org.apache.samza.task.TaskContext
-import org.apache.samza.config.Config
+import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.util.Logging
+import org.junit.Test
+
+import scala.collection.JavaConversions._
 
 /**
  * A simple unit test that drives the TestPerformanceTask. This unit test can

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala
index 190ce28..7b7d86a 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala
@@ -19,14 +19,15 @@
 
 package org.apache.samza.job.yarn
 
-import scala.annotation.elidable
-import scala.annotation.elidable.ASSERTION
+import TestSamzaAppMasterTaskManager._
 
 import org.apache.hadoop.yarn.api.records.{ Container, ContainerStatus }
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.junit.Test
+import org.junit.Assert._
 
-import TestSamzaAppMasterTaskManager._
+import scala.annotation.elidable
+import scala.annotation.elidable.ASSERTION
 
 class TestSamzaAppMaster {
   @Test
@@ -53,8 +54,8 @@ class TestSamzaAppMaster {
     }
     SamzaAppMaster.listeners = List(listener)
     SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1)
-    assert(listener.init == 1)
-    assert(listener.shutdown == 1)
+    assertEquals(1, listener.init)
+    assertEquals(1, listener.shutdown)
   }
 
   @Test
@@ -78,8 +79,8 @@ class TestSamzaAppMaster {
     // listener1 will throw an exception in shutdown, and listener2 should still get called
     SamzaAppMaster.listeners = List(listener1, listener2)
     SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1)
-    assert(listener1.shutdown == 1)
-    assert(listener2.shutdown == 1)
+    assertEquals(1, listener1.shutdown)
+    assertEquals(1, listener2.shutdown)
   }
 
   @Test
@@ -105,8 +106,8 @@ class TestSamzaAppMaster {
     thread.start
     thread.interrupt
     thread.join
-    assert(listener.init == 1)
-    assert(listener.shutdown == 1)
+    assertEquals(1, listener.init)
+    assertEquals(1, listener.shutdown)
   }
 
   @Test
@@ -127,8 +128,8 @@ class TestSamzaAppMaster {
     SamzaAppMaster.listeners = List(listener)
     SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1)
     // heartbeat may be triggered for more than once
-    assert(listener.allocated >= 1)
-    assert(listener.complete >= 1)
+    assertTrue(listener.allocated >= 1)
+    assertTrue(listener.complete >= 1)
   }
 
   @Test
@@ -145,6 +146,6 @@ class TestSamzaAppMaster {
     SamzaAppMaster.listeners = List(listener)
     SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1)
     // heartbeat may be triggered for more than once
-    assert(listener.reboot >= 1)
+    assertTrue(listener.reboot >= 1)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
index 3418a4c..6bf6aee 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
@@ -21,9 +21,6 @@ package org.apache.samza.job.yarn
 
 import java.nio.ByteBuffer
 
-import scala.annotation.elidable
-import scala.annotation.elidable.ASSERTION
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse
 import org.apache.hadoop.yarn.api.records._
@@ -32,9 +29,12 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync.CallbackHandler
 import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl
 import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.samza.SamzaException
+import org.mockito.Mockito
 import org.junit.Assert._
 import org.junit.Test
-import org.mockito.Mockito
+
+import scala.annotation.elidable
+import scala.annotation.elidable.ASSERTION
 
 class TestSamzaAppMasterLifecycle {
   val amClient = new AMRMClientAsyncImpl[ContainerRequest](1, Mockito.mock(classOf[CallbackHandler])) {
@@ -84,8 +84,8 @@ class TestSamzaAppMasterLifecycle {
     state.rpcPort = 1
     val saml = new SamzaAppMasterLifecycle(512, 2, state, amClient)
     saml.onInit
-    assert(amClient.host == "test")
-    assert(amClient.port == 1)
+    assertEquals("test", amClient.host)
+    assertEquals(1, amClient.port)
     assertFalse(saml.shouldShutdown)
   }
 
@@ -94,7 +94,7 @@ class TestSamzaAppMasterLifecycle {
     val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
     state.status = FinalApplicationStatus.SUCCEEDED
     new SamzaAppMasterLifecycle(128, 1, state, amClient).onShutdown
-    assert(amClient.status == FinalApplicationStatus.SUCCEEDED)
+    assertEquals(FinalApplicationStatus.SUCCEEDED, amClient.status)
   }
 
   @Test
@@ -106,7 +106,7 @@ class TestSamzaAppMasterLifecycle {
       // expected
       case e: SamzaException => gotException = true
     }
-    assert(gotException)
+    assertTrue(gotException)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
index d68dbdd..a7ce241 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
@@ -19,14 +19,16 @@
 
 package org.apache.samza.job.yarn
 
-import scala.collection.JavaConversions._
-import org.apache.samza.config.MapConfig
-import org.junit.Assert._
-import org.junit.Test
 import java.io.BufferedReader
 import java.net.URL
 import java.io.InputStreamReader
+
 import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.samza.config.MapConfig
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConversions._
 
 class TestSamzaAppMasterService {
   @Test
@@ -36,8 +38,8 @@ class TestSamzaAppMasterService {
 
     // start the dashboard
     service.onInit
-    assert(state.rpcPort > 0)
-    assert(state.trackingPort > 0)
+    assertTrue(state.rpcPort > 0)
+    assertTrue(state.trackingPort > 0)
 
     // check to see if it's running
     val url = new URL("http://127.0.0.1:%d/am" format state.rpcPort)
@@ -77,8 +79,8 @@ class TestSamzaAppMasterService {
 
     // start the dashboard
     service.onInit
-    assert(state.rpcPort > 0)
-    assert(state.trackingPort > 0)
+    assertTrue(state.rpcPort > 0)
+    assertTrue(state.trackingPort > 0)
 
     // Do a GET Request on the tracking port: This in turn will render index.scaml
     val url = new URL("http://127.0.0.1:%d/" format state.trackingPort)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
index 8cfdbe0..3f3154c 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
@@ -19,8 +19,6 @@
 
 package org.apache.samza.job.yarn
 
-import scala.collection.JavaConversions._
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
@@ -41,6 +39,8 @@ import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
 import org.apache.samza.util.Util
 import org.junit.Test
 
+import scala.collection.JavaConversions._
+
 import TestSamzaAppMasterTaskManager._
 
 object TestSamzaAppMasterTaskManager {
@@ -153,7 +153,7 @@ class TestSamzaAppMasterTaskManager {
   def testAppMasterShouldDefaultToOneContainerIfTaskCountIsNotSpecified {
     val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
     val taskManager = new SamzaAppMasterTaskManager(clock, config, state, null, new YarnConfiguration)
-    assert(state.taskCount == 1)
+    assertEquals(1, state.taskCount)
   }
 
   @Test
@@ -161,13 +161,13 @@ class TestSamzaAppMasterTaskManager {
     val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
     val taskManager = new SamzaAppMasterTaskManager(clock, config, state, null, new YarnConfiguration)
 
-    assert(taskManager.shouldShutdown == false)
+    assertFalse(taskManager.shouldShutdown)
     taskManager.onContainerCompleted(getContainerStatus(state.containerId, 0, ""))
-    assert(taskManager.shouldShutdown == true)
-    assert(state.completedTasks == 1)
-    assert(state.taskCount == 1)
-    assert(state.jobHealthy)
-    assert(state.status.equals(FinalApplicationStatus.SUCCEEDED))
+    assertTrue(taskManager.shouldShutdown)
+    assertEquals(1, state.completedTasks)
+    assertEquals(1, state.taskCount)
+    assertTrue(state.jobHealthy)
+    assertEquals(FinalApplicationStatus.SUCCEEDED, state.status)
   }
 
   @Test
@@ -180,12 +180,12 @@ class TestSamzaAppMasterTaskManager {
       }
     }
 
-    assert(taskManager.shouldShutdown == false)
+    assertFalse(taskManager.shouldShutdown)
     val container2 = ConverterUtils.toContainerId("container_1350670447861_0003_01_000002")
     taskManager.onInit
     taskManager.onContainerAllocated(getContainer(container2))
     taskManager.onContainerCompleted(getContainerStatus(container2, 1, "expecting a failure here"))
-    assert(taskManager.shouldShutdown == false)
+    assertFalse(taskManager.shouldShutdown)
     assertFalse(state.jobHealthy)
 
     // 2. First is from onInit, second is from onContainerCompleted, since it failed.
@@ -195,13 +195,13 @@ class TestSamzaAppMasterTaskManager {
 
     // 3. Now trigger an AM shutdown since our retry count is 1, and we're failing twice
     taskManager.onContainerAllocated(getContainer(container2))
-    assert(state.jobHealthy)
+    assertTrue(state.jobHealthy)
     taskManager.onContainerCompleted(getContainerStatus(container2, 1, "expecting a failure here"))
     assertEquals(2, amClient.getClient.requests.size)
     assertEquals(0, amClient.getClient.getRelease.size)
     assertFalse(state.jobHealthy)
     assertTrue(taskManager.shouldShutdown)
-    assert(state.status.equals(FinalApplicationStatus.FAILED))
+    assertEquals(FinalApplicationStatus.FAILED, state.status)
   }
 
   @Test
@@ -224,60 +224,60 @@ class TestSamzaAppMasterTaskManager {
     val container2 = ConverterUtils.toContainerId("container_1350670447861_0003_01_000002")
     val container3 = ConverterUtils.toContainerId("container_1350670447861_0003_01_000003")
 
-    assert(taskManager.shouldShutdown == false)
+    assertFalse(taskManager.shouldShutdown)
     taskManager.onInit
-    assert(taskManager.shouldShutdown == false)
-    assert(amClient.getClient.requests.size == 1)
-    assert(amClient.getClient.getRelease.size == 0)
+    assertFalse(taskManager.shouldShutdown)
+    assertEquals(1, amClient.getClient.requests.size)
+    assertEquals(0, amClient.getClient.getRelease.size)
 
     // allocate container 2
     taskManager.onContainerAllocated(getContainer(container2))
-    assert(state.neededContainers == 0)
-    assert(state.runningTasks.size == 1)
-    assert(state.taskToTaskNames.size == 1)
-    assert(state.unclaimedTasks.size == 0)
-    assert(containersRequested == 1)
-    assert(containersStarted == 1)
+    assertEquals(0, state.neededContainers)
+    assertEquals(1, state.runningTasks.size)
+    assertEquals(1, state.taskToTaskNames.size)
+    assertEquals(0, state.unclaimedTasks.size)
+    assertEquals(1, containersRequested)
+    assertEquals(1, containersStarted)
 
     // allocate an extra container, which the AM doesn't need, and should be released
     taskManager.onContainerAllocated(getContainer(container3))
-    assert(state.neededContainers == 0)
-    assert(state.runningTasks.size == 1)
-    assert(state.taskToTaskNames.size == 1)
-    assert(state.unclaimedTasks.size == 0)
-    assert(amClient.getClient.requests.size == 1)
-    assert(amClient.getClient.getRelease.size == 1)
-    assert(amClient.getClient.getRelease.head.equals(container3))
+    assertEquals(0, state.neededContainers)
+    assertEquals(1, state.runningTasks.size)
+    assertEquals(1, state.taskToTaskNames.size)
+    assertEquals(0, state.unclaimedTasks.size)
+    assertEquals(1, amClient.getClient.requests.size)
+    assertEquals(1, amClient.getClient.getRelease.size)
+    assertEquals(container3, amClient.getClient.getRelease.head)
 
     // reset the helper state, so we can make sure that releasing the container (next step) doesn't request more resources
     amClient.getClient.requests = List()
     amClient.getClient.resetRelease
 
     // now release the container, and make sure the AM doesn't ask for more
-    assert(taskManager.shouldShutdown == false)
+    assertFalse(taskManager.shouldShutdown)
     taskManager.onContainerCompleted(getContainerStatus(container3, -100, "pretend the container was released"))
-    assert(taskManager.shouldShutdown == false)
-    assert(state.neededContainers == 0)
-    assert(state.runningTasks.size == 1)
-    assert(state.taskToTaskNames.size == 1)
-    assert(state.unclaimedTasks.size == 0)
-    assert(amClient.getClient.requests.size == 0)
-    assert(amClient.getClient.getRelease.size == 0)
+    assertFalse(taskManager.shouldShutdown)
+    assertEquals(0, state.neededContainers)
+    assertEquals(1, state.runningTasks.size)
+    assertEquals(1, state.taskToTaskNames.size)
+    assertEquals(0, state.unclaimedTasks.size)
+    assertEquals(0, amClient.getClient.requests.size)
+    assertEquals(0, amClient.getClient.getRelease.size)
 
     // pretend container 2 is released due to an NM failure, and make sure that the AM requests a new container
-    assert(taskManager.shouldShutdown == false)
+    assertFalse(taskManager.shouldShutdown)
     taskManager.onContainerCompleted(getContainerStatus(container2, -100, "pretend the container was 'lost' due to an NM failure"))
-    assert(taskManager.shouldShutdown == false)
-    assert(state.jobHealthy == false)
-    assert(amClient.getClient.requests.size == 1)
-    assert(amClient.getClient.getRelease.size == 0)
+    assertFalse(taskManager.shouldShutdown)
+    assertFalse(state.jobHealthy)
+    assertEquals(1, amClient.getClient.requests.size)
+    assertEquals(0, amClient.getClient.getRelease.size)
 
     taskManager.onContainerAllocated(getContainer(container2))
-    assert(state.neededContainers == 0)
-    assert(state.jobHealthy)
-    assert(state.runningTasks.size == 1)
-    assert(state.taskToTaskNames.size == 1)
-    assert(state.unclaimedTasks.size == 0)
+    assertEquals(0, state.neededContainers)
+    assertTrue(state.jobHealthy)
+    assertEquals(1, state.runningTasks.size)
+    assertEquals(1, state.taskToTaskNames.size)
+    assertEquals(0, state.unclaimedTasks.size)
   }
 
   @Test
@@ -297,57 +297,57 @@ class TestSamzaAppMasterTaskManager {
     val container2 = ConverterUtils.toContainerId("container_1350670447861_0003_01_000002")
     val container3 = ConverterUtils.toContainerId("container_1350670447861_0003_01_000003")
 
-    assert(taskManager.shouldShutdown == false)
+    assertFalse(taskManager.shouldShutdown)
     taskManager.onInit
-    assert(taskManager.shouldShutdown == false)
-    assert(amClient.getClient.requests.size == 2)
-    assert(amClient.getClient.getRelease.size == 0)
+    assertFalse(taskManager.shouldShutdown)
+    assertEquals(2, amClient.getClient.requests.size)
+    assertEquals(0, amClient.getClient.getRelease.size)
     taskManager.onContainerAllocated(getContainer(container2))
-    assert(state.neededContainers == 1)
-    assert(state.runningTasks.size == 1)
-    assert(state.taskToTaskNames.size == 1)
-    assert(state.unclaimedTasks.size == 1)
-    assert(containersStarted == 1)
+    assertEquals(1, state.neededContainers)
+    assertEquals(1, state.runningTasks.size)
+    assertEquals(1, state.taskToTaskNames.size)
+    assertEquals(1, state.unclaimedTasks.size)
+    assertEquals(1, containersStarted)
     taskManager.onContainerAllocated(getContainer(container3))
-    assert(state.neededContainers == 0)
-    assert(state.runningTasks.size == 2)
-    assert(state.taskToTaskNames.size == 2)
-    assert(state.unclaimedTasks.size == 0)
-    assert(containersStarted == 2)
+    assertEquals(0, state.neededContainers)
+    assertEquals(2, state.runningTasks.size)
+    assertEquals(2, state.taskToTaskNames.size)
+    assertEquals(0, state.unclaimedTasks.size)
+    assertEquals(2, containersStarted)
 
     // container2 finishes successfully
     taskManager.onContainerCompleted(getContainerStatus(container2, 0, ""))
-    assert(state.neededContainers == 0)
-    assert(state.runningTasks.size == 1)
-    assert(state.taskToTaskNames.size == 1)
-    assert(state.unclaimedTasks.size == 0)
-    assert(state.completedTasks == 1)
+    assertEquals(0, state.neededContainers)
+    assertEquals(1, state.runningTasks.size)
+    assertEquals(1, state.taskToTaskNames.size)
+    assertEquals(0, state.unclaimedTasks.size)
+    assertEquals(1, state.completedTasks)
 
     // container3 fails
     taskManager.onContainerCompleted(getContainerStatus(container3, 1, "expected failure here"))
-    assert(state.neededContainers == 1)
-    assert(state.runningTasks.size == 0)
-    assert(state.taskToTaskNames.size == 0)
-    assert(state.unclaimedTasks.size == 1)
-    assert(state.completedTasks == 1)
-    assert(taskManager.shouldShutdown == false)
+    assertEquals(1, state.neededContainers)
+    assertEquals(0, state.runningTasks.size)
+    assertEquals(0, state.taskToTaskNames.size)
+    assertEquals(1, state.unclaimedTasks.size)
+    assertEquals(1, state.completedTasks)
+    assertFalse(taskManager.shouldShutdown)
 
     // container3 is re-allocated
     taskManager.onContainerAllocated(getContainer(container3))
-    assert(state.neededContainers == 0)
-    assert(state.runningTasks.size == 1)
-    assert(state.taskToTaskNames.size == 1)
-    assert(state.unclaimedTasks.size == 0)
-    assert(containersStarted == 3)
+    assertEquals(0, state.neededContainers)
+    assertEquals(1, state.runningTasks.size)
+    assertEquals(1, state.taskToTaskNames.size)
+    assertEquals(0, state.unclaimedTasks.size)
+    assertEquals(3, containersStarted)
 
     // container3 finishes sucecssfully
     taskManager.onContainerCompleted(getContainerStatus(container3, 0, ""))
-    assert(state.neededContainers == 0)
-    assert(state.runningTasks.size == 0)
-    assert(state.taskToTaskNames.size == 0)
-    assert(state.unclaimedTasks.size == 0)
-    assert(state.completedTasks == 2)
-    assert(taskManager.shouldShutdown == true)
+    assertEquals(0, state.neededContainers)
+    assertEquals(0, state.runningTasks.size)
+    assertEquals(0, state.taskToTaskNames.size)
+    assertEquals(0, state.unclaimedTasks.size)
+    assertEquals(2, state.completedTasks)
+    assertTrue(taskManager.shouldShutdown)
   }
 
   @Test
@@ -369,32 +369,32 @@ class TestSamzaAppMasterTaskManager {
     val container2 = ConverterUtils.toContainerId("container_1350670447861_0003_01_000002")
     val container3 = ConverterUtils.toContainerId("container_1350670447861_0003_01_000003")
 
-    assert(taskManager.shouldShutdown == false)
+    assertFalse(taskManager.shouldShutdown)
     taskManager.onInit
-    assert(taskManager.shouldShutdown == false)
-    assert(amClient.getClient.requests.size == 1)
-    assert(amClient.getClient.getRelease.size == 0)
-    assert(state.neededContainers == 1)
-    assert(state.runningTasks.size == 0)
-    assert(state.taskToTaskNames.size == 0)
-    assert(state.unclaimedTasks.size == 1)
+    assertFalse(taskManager.shouldShutdown)
+    assertEquals(1, amClient.getClient.requests.size)
+    assertEquals(0, amClient.getClient.getRelease.size)
+    assertEquals(1, state.neededContainers)
+    assertEquals(0, state.runningTasks.size)
+    assertEquals(0, state.taskToTaskNames.size)
+    assertEquals(1, state.unclaimedTasks.size)
     taskManager.onContainerAllocated(getContainer(container2))
-    assert(state.neededContainers == 0)
-    assert(state.runningTasks.size == 1)
-    assert(state.taskToTaskNames.size == 1)
-    assert(state.unclaimedTasks.size == 0)
-    assert(containersRequested == 1)
-    assert(containersStarted == 1)
+    assertEquals(0, state.neededContainers)
+    assertEquals(1, state.runningTasks.size)
+    assertEquals(1, state.taskToTaskNames.size)
+    assertEquals(0, state.unclaimedTasks.size)
+    assertEquals(1, containersRequested)
+    assertEquals(1, containersStarted)
     taskManager.onContainerAllocated(getContainer(container3))
-    assert(state.neededContainers == 0)
-    assert(state.runningTasks.size == 1)
-    assert(state.taskToTaskNames.size == 1)
-    assert(state.unclaimedTasks.size == 0)
-    assert(containersRequested == 1)
-    assert(containersStarted == 1)
-    assert(amClient.getClient.requests.size == 1)
-    assert(amClient.getClient.getRelease.size == 1)
-    assert(amClient.getClient.getRelease.head.equals(container3))
+    assertEquals(0, state.neededContainers)
+    assertEquals(1, state.runningTasks.size)
+    assertEquals(1, state.taskToTaskNames.size)
+    assertEquals(0, state.unclaimedTasks.size)
+    assertEquals(1, containersRequested)
+    assertEquals(1, containersStarted)
+    assertEquals(1, amClient.getClient.requests.size)
+    assertEquals(1, amClient.getClient.getRelease.size)
+    assertTrue(amClient.getClient.getRelease.head.equals(container3))
   }
 
   val clock = () => System.currentTimeMillis


[2/2] git commit: SAMZA-412; replace assert calls in tests with appropriate JUnit assert methods

Posted by cr...@apache.org.
SAMZA-412; replace assert calls in tests with appropriate JUnit assert methods


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/811f2897
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/811f2897
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/811f2897

Branch: refs/heads/master
Commit: 811f2897c640c684f4506a6bab7590304de34fca
Parents: cb40a59
Author: David Chen <dc...@linkedin.com>
Authored: Tue Sep 16 09:29:20 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Sep 16 09:29:20 2014 -0700

----------------------------------------------------------------------
 .../org/apache/samza/config/TestConfig.java     |  45 ++--
 .../metrics/TestSlidingTimeWindowReservoir.java |  20 +-
 .../org/apache/samza/metrics/TestTimer.java     |  14 +-
 .../samza/util/TestBlockingEnvelopeMap.java     |   2 +
 .../samza/util/TestNoOpMetricsRegistry.java     |  26 ++-
 ...inglePartitionWithoutOffsetsSystemAdmin.java |  10 +-
 .../samza/checkpoint/TestCheckpointTool.scala   |   3 +-
 .../samza/checkpoint/TestOffsetManager.scala    |  11 +-
 .../factories/TestPropertiesConfigFactory.scala |   4 +-
 .../samza/container/TestSamzaContainer.scala    |  14 +-
 .../samza/container/TestTaskInstance.scala      |  29 +--
 .../TestTaskNamesToSystemStreamPartitions.scala |   6 +-
 .../grouper/stream/GroupByTestBase.scala        |  13 +-
 .../grouper/stream/TestGroupByPartition.scala   |  10 +-
 .../TestGroupBySystemStreamPartition.scala      |  15 +-
 .../org/apache/samza/job/TestJobRunner.scala    |   5 +-
 .../samza/job/TestShellCommandBuilder.scala     |   2 +-
 .../apache/samza/job/local/TestProcessJob.scala |   3 +-
 .../apache/samza/job/local/TestThreadJob.scala  |   3 +-
 .../apache/samza/metrics/TestJmxServer.scala    |  17 +-
 .../metrics/reporter/TestJmxReporter.scala      |  19 +-
 .../samza/serializers/TestByteSerde.scala       |  11 +-
 .../samza/serializers/TestCheckpointSerde.scala |   2 +
 .../samza/serializers/TestIntegerSerde.scala    |   7 +-
 .../samza/serializers/TestStringSerde.scala     |   5 +-
 .../samza/system/TestSystemConsumers.scala      |  19 +-
 .../system/chooser/TestBatchingChooser.scala    |  22 +-
 .../chooser/TestBootstrappingChooser.scala      |  70 +++---
 .../system/chooser/TestDefaultChooser.scala     |  23 +-
 .../system/chooser/TestRoundRobinChooser.scala  |  19 +-
 .../chooser/TestTieredPriorityChooser.scala     |  56 ++---
 .../filereader/TestFileReaderSystemAdmin.scala  |  16 +-
 .../TestFileReaderSystemConsumer.scala          |   2 +
 .../TestFileReaderSystemFactory.scala           |   7 +-
 .../samza/task/TestReadableCoordinator.scala    |   4 +-
 .../samza/util/TestDaemonThreadFactory.scala    |   7 +-
 .../util/TestExponentialSleepStrategy.scala     |   4 +-
 .../scala/org/apache/samza/util/TestUtil.scala  |   2 +
 .../kafka/TestKafkaCheckpointLogKey.scala       |   2 +-
 .../kafka/TestKafkaCheckpointManager.scala      |   9 +-
 .../apache/samza/config/TestKafkaConfig.scala   |  11 +-
 .../samza/config/TestKafkaSerdeConfig.scala     |   8 +-
 .../samza/config/TestRegExTopicGenerator.scala  |   6 +-
 .../samza/serializers/TestKafkaSerde.scala      |   8 +-
 .../samza/system/kafka/TestBrokerProxy.scala    |  26 +--
 .../samza/system/kafka/TestGetOffset.scala      |  16 +-
 .../system/kafka/TestKafkaSystemAdmin.scala     |  37 ++--
 .../system/kafka/TestKafkaSystemConsumer.scala  |  15 +-
 .../system/kafka/TestKafkaSystemFactory.scala   |  11 +-
 .../system/kafka/TestKafkaSystemProducer.scala  |  24 ++-
 .../system/kafka/TestTopicMetadataCache.scala   |  14 +-
 .../samza/storage/kv/TestKeyValueStores.scala   |  16 +-
 .../samza/logging/log4j/TestJmxAppender.java    |   5 +-
 .../samza/serializers/TestJsonSerde.scala       |   4 +-
 .../serializers/TestMetricsSnapshotSerde.scala  |  14 +-
 .../test/integration/TestStatefulTask.scala     |  11 +-
 .../TestSamzaContainerPerformance.scala         |  17 +-
 .../samza/job/yarn/TestSamzaAppMaster.scala     |  25 +--
 .../job/yarn/TestSamzaAppMasterLifecycle.scala  |  16 +-
 .../job/yarn/TestSamzaAppMasterService.scala    |  18 +-
 .../yarn/TestSamzaAppMasterTaskManager.scala    | 214 +++++++++----------
 61 files changed, 587 insertions(+), 487 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-api/src/test/java/org/apache/samza/config/TestConfig.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/config/TestConfig.java b/samza-api/src/test/java/org/apache/samza/config/TestConfig.java
index e701296..b4100c2 100644
--- a/samza-api/src/test/java/org/apache/samza/config/TestConfig.java
+++ b/samza-api/src/test/java/org/apache/samza/config/TestConfig.java
@@ -19,13 +19,18 @@
 
 package org.apache.samza.config;
 
-import org.junit.Assert.* ;
-import org.junit.Test ;
-import java.util.Map ;
-import java.util.HashMap ;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map;
+import java.util.HashMap;
+
+import org.junit.Test;
 
 public class TestConfig {
-  // Utility methods to make it easier to tell the class of a primitive via overloaded args
+  /**
+   * Utility methods to make it easier to tell the class of a primitive via
+   * overloaded args
+   */
   Class getClass(long l) {
     return Long.class ;
   }
@@ -35,25 +40,25 @@ public class TestConfig {
   }
 
   @Test
-  public void testgetShortAndLong(){
-    Map<String, String> m = new HashMap<String, String>() { {
-      put("testkey", "11") ;
-    } } ;
+  public void testgetShortAndLong() {
+    Map<String, String> m = new HashMap<String, String>() {{
+      put("testkey", "11");
+    }};
 
-    MapConfig mc = new MapConfig(m) ;
-    short defaultShort=0 ;
-    long defaultLong=0 ;
+    MapConfig mc = new MapConfig(m);
+    short defaultShort = 0;
+    long defaultLong = 0;
 
-    Class c1 = getClass(mc.getShort("testkey")) ;
-    assert(c1 == Short.class) ;
+    Class c1 = getClass(mc.getShort("testkey"));
+    assertEquals(Short.class, c1);
 
-    Class c2 = getClass(mc.getShort("testkey", defaultShort)) ;
-    assert(c2 == Short.class) ;
+    Class c2 = getClass(mc.getShort("testkey", defaultShort));
+    assertEquals(Short.class, c2);
 
-    Class c3 = getClass(mc.getLong("testkey")) ;
-    assert(c3 == Long.class) ;
+    Class c3 = getClass(mc.getLong("testkey"));
+    assertEquals(Long.class, c3);
 
-    Class c4 = getClass(mc.getLong("testkey", defaultLong)) ;
-    assert(c4 == Long.class) ;
+    Class c4 = getClass(mc.getLong("testkey", defaultLong));
+    assertEquals(Long.class, c4);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java b/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
index eb5043b..d392b32 100644
--- a/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
+++ b/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.metrics;
 
+import static org.mockito.Mockito.*;
 import static org.junit.Assert.*;
 
 import java.util.Arrays;
@@ -26,15 +27,14 @@ import java.util.Arrays;
 import org.apache.samza.util.Clock;
 import org.junit.Test;
 
-import static org.mockito.Mockito.*;
-
 public class TestSlidingTimeWindowReservoir {
 
   private final Clock clock = mock(Clock.class);
 
   @Test
   public void testUpdateSizeSnapshot() {
-    SlidingTimeWindowReservoir slidingTimeWindowReservoir = new SlidingTimeWindowReservoir(300, clock);
+    SlidingTimeWindowReservoir slidingTimeWindowReservoir =
+        new SlidingTimeWindowReservoir(300, clock);
 
     when(clock.currentTimeMillis()).thenReturn(0L);
     slidingTimeWindowReservoir.update(1L);
@@ -49,24 +49,26 @@ public class TestSlidingTimeWindowReservoir {
 
     Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot();
     assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L, 3L)));
-    assertTrue(snapshot.getSize() == 3);
+    assertEquals(3, snapshot.getSize());
   }
 
   @Test
   public void testDuplicateTime() {
-    SlidingTimeWindowReservoir slidingTimeWindowReservoir = new SlidingTimeWindowReservoir(300, clock);
+    SlidingTimeWindowReservoir slidingTimeWindowReservoir =
+        new SlidingTimeWindowReservoir(300, clock);
     when(clock.currentTimeMillis()).thenReturn(0L);
     slidingTimeWindowReservoir.update(1L);
     slidingTimeWindowReservoir.update(2L);
 
     Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot();
     assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L)));
-    assertTrue(snapshot.getSize() == 2);
+    assertEquals(2, snapshot.getSize());
   }
 
   @Test
   public void testRemoveExpiredValues() {
-    SlidingTimeWindowReservoir slidingTimeWindowReservoir = new SlidingTimeWindowReservoir(300, clock);
+    SlidingTimeWindowReservoir slidingTimeWindowReservoir =
+        new SlidingTimeWindowReservoir(300, clock);
     when(clock.currentTimeMillis()).thenReturn(0L);
     slidingTimeWindowReservoir.update(1L);
 
@@ -81,6 +83,6 @@ public class TestSlidingTimeWindowReservoir {
 
     Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot();
     assertTrue(snapshot.getValues().containsAll(Arrays.asList(3L, 4L)));
-    assertTrue(snapshot.getSize() == 2);
+    assertEquals(2, snapshot.getSize());
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
index dcc3cb8..63c183f 100644
--- a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
+++ b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
@@ -28,7 +28,9 @@ import org.junit.Test;
 
 public class TestTimer {
 
-  // mock clock
+  /*
+   * Mock clock
+   */
   private final Clock clock = new Clock() {
     long value = 0;
 
@@ -46,7 +48,7 @@ public class TestTimer {
 
     Snapshot snapshot = timer.getSnapshot();
     assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L)));
-    assertTrue(snapshot.getValues().size() == 2);
+    assertEquals(2, snapshot.getValues().size());
   }
 
   @Test
@@ -58,13 +60,13 @@ public class TestTimer {
 
     Snapshot snapshot = timer.getSnapshot();
     assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L, 3L)));
-    assertTrue(snapshot.getValues().size() == 3);
+    assertEquals(3, snapshot.getValues().size());
 
-    // the time is 500 for update(4L) because getSnapshot calls clock once + 3
+    // The time is 500 for update(4L) because getSnapshot calls clock once + 3
     // updates that call clock 3 times
     timer.update(4L);
     Snapshot snapshot2 = timer.getSnapshot();
     assertTrue(snapshot2.getValues().containsAll(Arrays.asList(3L, 4L)));
-    assertTrue(snapshot2.getValues().size() == 2);
+    assertEquals(2, snapshot2.getValues().size());
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
index 35ba52d..4eb87eb 100644
--- a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
+++ b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
@@ -22,6 +22,7 @@ package org.apache.samza.util;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -30,6 +31,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.samza.Partition;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStreamPartition;

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java b/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java
index 2d0034f..1d1e3c5 100644
--- a/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java
+++ b/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java
@@ -33,32 +33,38 @@ public class TestNoOpMetricsRegistry {
     Counter counter1 = registry.newCounter("testc", "a");
     Counter counter2 = registry.newCounter("testc", "b");
     Counter counter3 = registry.newCounter("testc2", "c");
+
     Gauge<String> gauge1 = registry.newGauge("testg", "a", "1");
     Gauge<String> gauge2 = registry.newGauge("testg", "b", "2");
     Gauge<String> gauge3 = registry.newGauge("testg", "c", "3");
     Gauge<String> gauge4 = registry.newGauge("testg2", "d", "4");
+
     Timer timer1 = registry.newTimer("testt", "a");
     Timer timer2 = registry.newTimer("testt", "b");
     Timer timer3 = registry.newTimer("testt2", "c");
+
     counter1.inc();
     counter2.inc(2);
     counter3.inc(4);
+
     gauge1.set("5");
     gauge2.set("6");
     gauge3.set("7");
     gauge4.set("8");
+
     timer1.update(1L);
     timer2.update(2L);
     timer3.update(3L);
-    assertEquals(counter1.getCount(), 1);
-    assertEquals(counter2.getCount(), 2);
-    assertEquals(counter3.getCount(), 4);
-    assertEquals(gauge1.getValue(), "5");
-    assertEquals(gauge2.getValue(), "6");
-    assertEquals(gauge3.getValue(), "7");
-    assertEquals(gauge4.getValue(), "8");
-    assertEquals(timer1.getSnapshot().getAverage(), 1, 0);
-    assertEquals(timer2.getSnapshot().getAverage(), 2, 0);
-    assertEquals(timer3.getSnapshot().getAverage(), 3, 0);
+
+    assertEquals(1, counter1.getCount());
+    assertEquals(2, counter2.getCount());
+    assertEquals(4, counter3.getCount());
+    assertEquals("5", gauge1.getValue());
+    assertEquals("6", gauge2.getValue());
+    assertEquals("7", gauge3.getValue());
+    assertEquals("8", gauge4.getValue());
+    assertEquals(1, timer1.getSnapshot().getAverage(), 0);
+    assertEquals(2, timer2.getSnapshot().getAverage(), 0);
+    assertEquals(3, timer3.getSnapshot().getAverage(), 0);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-api/src/test/java/org/apache/samza/util/TestSinglePartitionWithoutOffsetsSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/util/TestSinglePartitionWithoutOffsetsSystemAdmin.java b/samza-api/src/test/java/org/apache/samza/util/TestSinglePartitionWithoutOffsetsSystemAdmin.java
index 4166493..025f0a6 100644
--- a/samza-api/src/test/java/org/apache/samza/util/TestSinglePartitionWithoutOffsetsSystemAdmin.java
+++ b/samza-api/src/test/java/org/apache/samza/util/TestSinglePartitionWithoutOffsetsSystemAdmin.java
@@ -20,10 +20,12 @@
 package org.apache.samza.util;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+
 import org.apache.samza.Partition;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.SystemStreamMetadata;
@@ -36,12 +38,14 @@ public class TestSinglePartitionWithoutOffsetsSystemAdmin {
     Set<String> streamNames = new HashSet<String>();
     streamNames.add("a");
     streamNames.add("b");
+
     Map<String, SystemStreamMetadata> metadata = admin.getSystemStreamMetadata(streamNames);
-    assertEquals(metadata.size(), 2);
+    assertEquals(2, metadata.size());
     SystemStreamMetadata metadata1 = metadata.get("a");
     SystemStreamMetadata metadata2 = metadata.get("b");
+
     assertEquals(1, metadata1.getSystemStreamPartitionMetadata().size());
     assertEquals(1, metadata2.getSystemStreamPartitionMetadata().size());
-    assertEquals(null, metadata.get(new SystemStreamPartition("test-system", "c", new Partition(0))));
+    assertNull(metadata.get(new SystemStreamPartition("test-system", "c", new Partition(0))));
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index 1eb3995..af800df 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -20,6 +20,7 @@
 package org.apache.samza.checkpoint
 
 import org.apache.samza.Partition
+import org.apache.samza.container.TaskName
 import org.apache.samza.checkpoint.TestCheckpointTool.{MockCheckpointManagerFactory, MockSystemFactory}
 import org.apache.samza.config.{Config, MapConfig, SystemConfig, TaskConfig}
 import org.apache.samza.metrics.MetricsRegistry
@@ -30,8 +31,8 @@ import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest.junit.AssertionsForJUnit
 import org.scalatest.mock.MockitoSugar
+
 import scala.collection.JavaConversions._
-import org.apache.samza.container.TaskName
 
 object TestCheckpointTool {
   var checkpointManager: CheckpointManager = null

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index 44a98a5..a79ecca 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -19,7 +19,9 @@
 
 package org.apache.samza.checkpoint
 
-import scala.collection.JavaConversions._
+import java.util
+
+import org.apache.samza.container.TaskName
 import org.apache.samza.Partition
 import org.apache.samza.system.SystemStream
 import org.apache.samza.system.SystemStreamMetadata
@@ -30,10 +32,10 @@ import org.junit.{Ignore, Test}
 import org.apache.samza.SamzaException
 import org.apache.samza.config.MapConfig
 import org.apache.samza.system.SystemAdmin
-import java.util
-import org.apache.samza.container.TaskName
 import org.scalatest.Assertions.intercept
 
+import scala.collection.JavaConversions._
+
 class TestOffsetManager {
   @Test
   def testSystemShouldUseDefaults {
@@ -47,7 +49,7 @@ class TestOffsetManager {
     val offsetManager = OffsetManager(systemStreamMetadata, config)
     offsetManager.register(taskName, Set(systemStreamPartition))
     offsetManager.start
-    assertTrue(!offsetManager.getLastProcessedOffset(systemStreamPartition).isDefined)
+    assertFalse(offsetManager.getLastProcessedOffset(systemStreamPartition).isDefined)
     assertTrue(offsetManager.getStartingOffset(systemStreamPartition).isDefined)
     assertEquals("0", offsetManager.getStartingOffset(systemStreamPartition).get)
   }
@@ -232,7 +234,6 @@ class TestOffsetManager {
       override def writeChangeLogPartitionMapping(mapping: util.Map[TaskName, java.lang.Integer]): Unit = taskNameToPartitionMapping = mapping
 
       override def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = taskNameToPartitionMapping
-
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala b/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala
index f254741..9688abb 100644
--- a/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala
+++ b/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala
@@ -18,8 +18,10 @@
  */
 
 package org.apache.samza.config.factories
+
 import java.net.URI
 import java.io.File
+
 import org.apache.samza.SamzaException
 import org.junit.Assert._
 import org.junit.Test
@@ -30,7 +32,7 @@ class TestPropertiesConfigFactory {
   @Test
   def testCanReadPropertiesConfigFiles {
     val config = factory.getConfig(URI.create("file://%s/src/test/resources/test.properties" format new File(".").getCanonicalPath))
-    assert("bar".equals(config.get("foo")))
+    assertEquals("bar", config.get("foo"))
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 8a04a8a..b7a9569 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -24,26 +24,26 @@ import org.junit.Assert._
 import org.junit.Test
 import org.apache.samza.Partition
 import org.apache.samza.config.MapConfig
+import org.apache.samza.metrics.JmxServer
+import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.SystemConsumers
-import org.apache.samza.system.chooser.RoundRobinChooser
 import org.apache.samza.system.SystemConsumer
 import org.apache.samza.system.SystemProducers
 import org.apache.samza.system.SystemProducer
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.StreamMetadataCache
+import org.apache.samza.system.chooser.RoundRobinChooser
 import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.task.StreamTask
 import org.apache.samza.task.MessageCollector
-import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.task.TaskCoordinator
 import org.apache.samza.task.InitableTask
 import org.apache.samza.task.TaskContext
 import org.apache.samza.task.ClosableTask
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.StreamMetadataCache
 import org.apache.samza.task.TaskInstanceCollector
+import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
 import org.scalatest.junit.AssertionsForJUnit
-import org.apache.samza.metrics.JmxServer
 
 class TestSamzaContainer extends AssertionsForJUnit {
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index be53373..c31a74e 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -21,27 +21,28 @@ package org.apache.samza.container
 
 import org.junit.Assert._
 import org.junit.Test
-import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.system.SystemProducers
-import org.apache.samza.task.MessageCollector
-import org.apache.samza.task.StreamTask
-import org.apache.samza.system.SystemConsumers
-import org.apache.samza.task.TaskCoordinator
-import org.apache.samza.config.MapConfig
 import org.apache.samza.Partition
-import org.apache.samza.system.chooser.RoundRobinChooser
-import org.apache.samza.system.SystemProducer
+import org.apache.samza.checkpoint.OffsetManager
+import org.apache.samza.config.MapConfig
 import org.apache.samza.serializers.SerdeManager
+import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.SystemConsumer
+import org.apache.samza.system.SystemConsumers
+import org.apache.samza.system.SystemProducer
+import org.apache.samza.system.SystemProducers
 import org.apache.samza.system.SystemStream
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.task.ReadableCoordinator
-import org.apache.samza.checkpoint.OffsetManager
 import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import scala.collection.JavaConversions._
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.chooser.RoundRobinChooser
+import org.apache.samza.task.MessageCollector
+import org.apache.samza.task.ReadableCoordinator
+import org.apache.samza.task.StreamTask
+import org.apache.samza.task.TaskCoordinator
 import org.apache.samza.task.TaskInstanceCollector
 
+import scala.collection.JavaConversions._
+
 class TestTaskInstance {
   @Test
   def testOffsetsAreUpdatedOnProcess {
@@ -80,4 +81,4 @@ class TestTaskInstance {
     assertTrue(lastProcessedOffset.isDefined)
     assertEquals("2", lastProcessedOffset.get)
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala
index d680b20..9a3406e 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala
@@ -18,10 +18,10 @@
  */
 package org.apache.samza.container
 
-import org.junit.Test
-import org.junit.Assert._
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.{SamzaException, Partition}
+import org.junit.Test
+import org.junit.Assert._
 
 class TestTaskNamesToSystemStreamPartitions {
   var sspCounter = 0
@@ -36,7 +36,7 @@ class TestTaskNamesToSystemStreamPartitions {
     val asSet = tntssp.toSet
     val expected = Set(new TaskName("tn1") -> Set(makeSSP("tn1-1"), makeSSP("tn1-2")),
                       (new TaskName("tn2") -> Set(makeSSP("tn2-1"), makeSSP("tn2-2"))))
-    assertEquals(expected , asSet)
+    assertEquals(expected, asSet)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
index 47d716e..a14169b 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
@@ -18,15 +18,16 @@
  */
 package org.apache.samza.container.grouper.stream
 
-import org.apache.samza.Partition
-import org.apache.samza.system.SystemStreamPartition
-import org.junit.Test
+import java.util.Collections
 import java.util.HashSet
 import java.util.Map
 import java.util.Set
-import org.junit.Assert._
-import java.util.Collections
+
+import org.apache.samza.Partition
 import org.apache.samza.container.TaskName
+import org.apache.samza.system.SystemStreamPartition
+import org.junit.Test
+import org.junit.Assert._
 
 object GroupByTestBase {
   val aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0))
@@ -54,4 +55,4 @@ abstract class GroupByTestBase {
     val result: Map[TaskName, Set[SystemStreamPartition]] = grouper.group(input)
     assertEquals(output, result)
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
index 2fa718c..74daf72 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
@@ -19,15 +19,17 @@
 package org.apache.samza.container.grouper.stream
 
 import org.apache.samza.container.TaskName
-import scala.collection.JavaConverters._
 import org.junit.Test
 
+import scala.collection.JavaConverters._
+
 class TestGroupByPartition extends GroupByTestBase {
   import GroupByTestBase._
 
-  val expected /* from base class provided set */ =  Map(new TaskName("Partition 0") -> Set(aa0, ac0).asJava,
-                                                         new TaskName("Partition 1") -> Set(aa1, ab1).asJava,
-                                                         new TaskName("Partition 2") -> Set(aa2, ab2).asJava).asJava
+  // from base class provided set
+  val expected = Map(new TaskName("Partition 0") -> Set(aa0, ac0).asJava,
+                     new TaskName("Partition 1") -> Set(aa1, ab1).asJava,
+                     new TaskName("Partition 2") -> Set(aa2, ab2).asJava).asJava
 
   override def getGrouper: SystemStreamPartitionGrouper = new GroupByPartition
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
index 8da0595..deb3895 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
@@ -19,19 +19,20 @@
 package org.apache.samza.container.grouper.stream
 
 import org.apache.samza.container.TaskName
-import scala.collection.JavaConverters._
 import org.junit.Test
+import scala.collection.JavaConverters._
 
 class TestGroupBySystemStreamPartition extends GroupByTestBase {
   import GroupByTestBase._
 
   // Building manually to avoid just duplicating a logic potential logic error here and there
-  val expected /* from base class provided set */ =  Map(new TaskName(aa0.toString) -> Set(aa0).asJava,
-    new TaskName(aa1.toString) -> Set(aa1).asJava,
-    new TaskName(aa2.toString) -> Set(aa2).asJava,
-    new TaskName(ab1.toString) -> Set(ab1).asJava,
-    new TaskName(ab2.toString) -> Set(ab2).asJava,
-    new TaskName(ac0.toString) -> Set(ac0).asJava).asJava
+  // From base class provided set
+  val expected = Map(new TaskName(aa0.toString) -> Set(aa0).asJava,
+                     new TaskName(aa1.toString) -> Set(aa1).asJava,
+                     new TaskName(aa2.toString) -> Set(aa2).asJava,
+                     new TaskName(ab1.toString) -> Set(ab1).asJava,
+                     new TaskName(ab2.toString) -> Set(ab2).asJava,
+                     new TaskName(ac0.toString) -> Set(ac0).asJava).asJava
 
   override def getGrouper: SystemStreamPartitionGrouper = new GroupBySystemStreamPartition
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
index 258ccc1..52057ed 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
@@ -18,9 +18,12 @@
  */
 
 package org.apache.samza.job
+
 import java.io.File
+
 import org.apache.samza.config.Config
 import org.junit.Test
+import org.junit.Assert._
 
 object TestJobRunner {
   var processCount = 0
@@ -34,7 +37,7 @@ class TestJobRunner {
       "org.apache.samza.config.factories.PropertiesConfigFactory",
       "--config-path",
       "file://%s/src/test/resources/test.properties" format new File(".").getCanonicalPath))
-    assert(TestJobRunner.processCount == 1)
+    assertEquals(1, TestJobRunner.processCount)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala b/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
index f8a535a..b186ec1 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
@@ -18,12 +18,12 @@
  */
 package org.apache.samza.job
 
-import org.junit.Test
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.Partition
 import org.apache.samza.util.Util._
 import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions}
 import org.junit.Assert._
+import org.junit.Test
 
 class TestShellCommandBuilder {
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
index d56024d..7f3ccfe 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
@@ -18,6 +18,7 @@
  */
 
 package org.apache.samza.job.local;
+
 import org.junit.Assert._
 import org.junit.Test
 import org.apache.samza.job.ApplicationStatus
@@ -39,6 +40,6 @@ class TestProcessJob {
     job.waitForFinish(500)
     job.kill
     job.waitForFinish(999999)
-    assert(ApplicationStatus.UnsuccessfulFinish.equals(job.waitForFinish(999999999)))
+    assertEquals(ApplicationStatus.UnsuccessfulFinish, job.waitForFinish(999999999))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
index 7d45889..4f3f511 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
@@ -18,6 +18,7 @@
  */
 
 package org.apache.samza.job.local
+
 import org.junit.Assert._
 import org.junit.Test
 import org.apache.samza.job.ApplicationStatus
@@ -44,6 +45,6 @@ class TestThreadJob {
     job.waitForFinish(500)
     job.kill
     job.waitForFinish(999999)
-    assert(ApplicationStatus.UnsuccessfulFinish.equals(job.waitForFinish(999999999)))
+    assertEquals(ApplicationStatus.UnsuccessfulFinish, job.waitForFinish(999999999))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala b/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
index f01117d..f49cfaa 100644
--- a/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
@@ -22,14 +22,15 @@ package org.apache.samza.metrics
 import org.junit.Assert._
 import org.junit.Test
 import org.apache.samza.util.Logging
-import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL}
+
 import java.io.IOException
 
+import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL}
 
 class TestJmxServer extends Logging {
   @Test
   def serverStartsUp {
-    var jmxServer:JmxServer = null
+    var jmxServer: JmxServer = null
 
     try {
       jmxServer = new JmxServer
@@ -45,14 +46,16 @@ class TestJmxServer extends Logging {
         assertTrue("Connected but mbean count is somehow 0", connection.getMBeanCount.intValue() > 0)
       } catch {
         case ioe:IOException => fail("Couldn't open connection to local JMX server")
-      }finally {
-        if(jmxConnector != null) jmxConnector.close
+      } finally {
+        if (jmxConnector != null) {
+          jmxConnector.close
+        }
       }
 
     } finally {
-      if (jmxServer != null) jmxServer.stop
+      if (jmxServer != null) {
+        jmxServer.stop
+      }
     }
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala b/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
index f6c8646..3cfd439 100644
--- a/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
+++ b/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
@@ -23,20 +23,23 @@ import org.junit.Assert._
 import org.junit.AfterClass
 import org.junit.BeforeClass
 import org.junit.Test
-import scala.collection.JavaConversions._
 import org.apache.samza.task.TaskContext
-import javax.management.remote.JMXConnectorFactory
 import org.apache.samza.metrics.MetricsRegistryMap
-import javax.management.remote.JMXConnectorServerFactory
-import javax.management.remote.JMXConnectorServer
-import java.rmi.registry.LocateRegistry
-import javax.management.remote.JMXServiceURL
 import org.apache.samza.config.MapConfig
-import java.lang.management.ManagementFactory
 import org.apache.samza.Partition
-import javax.management.ObjectName
 import org.apache.samza.metrics.JvmMetrics
 
+import java.lang.management.ManagementFactory
+import java.rmi.registry.LocateRegistry
+
+import javax.management.ObjectName
+import javax.management.remote.JMXServiceURL
+import javax.management.remote.JMXConnectorServerFactory
+import javax.management.remote.JMXConnectorServer
+import javax.management.remote.JMXConnectorFactory
+
+import scala.collection.JavaConversions._
+
 object TestJmxReporter {
   val port = 4500
   val url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:%d/jmxapitestrmi" format port)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala
index f64c263..f605762 100644
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala
@@ -19,19 +19,20 @@
 
 package org.apache.samza.serializers
 
+import java.util.Arrays
+
 import org.junit.Assert._
 import org.junit.Test
-import java.util.Arrays
 
 class TestByteSerde {
   @Test
   def testByteSerde {
     val serde = new ByteSerde
-    assertEquals(null, serde.toBytes(null))
-    assertEquals(null, serde.fromBytes(null))
+    assertNull(serde.toBytes(null))
+    assertNull(serde.fromBytes(null))
 
     val testBytes = "A lazy way of creating a byte array".getBytes()
-    assertTrue(Arrays.equals(serde.toBytes(testBytes), testBytes))
-    assertTrue( Arrays.equals(serde.fromBytes(testBytes), testBytes))
+    assertArrayEquals(serde.toBytes(testBytes), testBytes)
+    assertArrayEquals(serde.fromBytes(testBytes), testBytes)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
index 0d07314..3d0a603 100644
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
@@ -20,12 +20,14 @@
 package org.apache.samza.serializers
 
 import java.util
+
 import org.apache.samza.Partition
 import org.apache.samza.checkpoint.Checkpoint
 import org.apache.samza.container.TaskName
 import org.apache.samza.system.SystemStreamPartition
 import org.junit.Assert._
 import org.junit.Test
+
 import scala.collection.JavaConversions._
 
 class TestCheckpointSerde {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala
index 45a2b04..ad646d7 100644
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala
@@ -19,9 +19,10 @@
 
 package org.apache.samza.serializers
 
+import java.util.Arrays
+
 import org.junit.Assert._
 import org.junit.Test
-import java.util.Arrays
 
 class TestIntegerSerde {
   @Test
@@ -33,7 +34,7 @@ class TestIntegerSerde {
     val fooBar = 37
     val fooBarBytes = serde.toBytes(fooBar)
     fooBarBytes.foreach(System.err.println)
-    assertTrue(Arrays.equals(Array[Byte](0, 0, 0, 37), fooBarBytes))
+    assertArrayEquals(Array[Byte](0, 0, 0, 37), fooBarBytes)
     assertEquals(fooBar, serde.fromBytes(fooBarBytes))
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala
index 7fbf0c2..a1e8e88 100644
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala
@@ -21,7 +21,6 @@ package org.apache.samza.serializers
 
 import org.junit.Assert._
 import org.junit.Test
-import java.util.Arrays
 
 class TestStringSerde {
   @Test
@@ -32,7 +31,7 @@ class TestStringSerde {
 
     val fooBar = "foo bar"
     val fooBarBytes = serde.toBytes(fooBar)
-    assertTrue(Arrays.equals(fooBar.getBytes("UTF-8"), fooBarBytes))
+    assertArrayEquals(fooBar.getBytes("UTF-8"), fooBarBytes)
     assertEquals(fooBar, serde.fromBytes(fooBarBytes))
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
index 04229a6..3fdc781 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -19,15 +19,16 @@
 
 package org.apache.samza.system
 
-import scala.collection.JavaConversions._
-import org.apache.samza.Partition
 import org.junit.Assert._
 import org.junit.Test
+import org.apache.samza.Partition
+import org.apache.samza.serializers._
 import org.apache.samza.system.chooser.MessageChooser
 import org.apache.samza.system.chooser.DefaultChooser
-import org.apache.samza.util.BlockingEnvelopeMap
-import org.apache.samza.serializers._
 import org.apache.samza.system.chooser.MockMessageChooser
+import org.apache.samza.util.BlockingEnvelopeMap
+
+import scala.collection.JavaConversions._
 
 class TestSystemConsumers {
   def testPollIntervalMs {
@@ -44,7 +45,7 @@ class TestSystemConsumers {
     consumers.register(systemStreamPartition1, "1234")
     consumers.start
 
-    // Tell the consumer to respond with 1000 messages for SSP0, and no 
+    // Tell the consumer to respond with 1000 messages for SSP0, and no
     // messages for SSP1.
     consumer.setResponseSizes(numEnvelopes)
 
@@ -60,13 +61,13 @@ class TestSystemConsumers {
     // We aren't polling because we're getting non-null envelopes.
     assertEquals(2, consumer.polls)
 
-    // Advance the clock to trigger a new poll even though there are still 
+    // Advance the clock to trigger a new poll even though there are still
     // messages.
     now = SystemConsumers.DEFAULT_POLL_INTERVAL_MS
 
     assertEquals(envelope, consumers.choose)
 
-    // We polled even though there are still 997 messages in the unprocessed 
+    // We polled even though there are still 997 messages in the unprocessed
     // message buffer.
     assertEquals(3, consumer.polls)
     assertEquals(1, consumer.lastPoll.size)
@@ -74,7 +75,7 @@ class TestSystemConsumers {
     // Only SSP1 was polled because we still have messages for SSP2.
     assertTrue(consumer.lastPoll.contains(systemStreamPartition1))
 
-    // Now drain all messages for SSP0. There should be exactly 997 messages, 
+    // Now drain all messages for SSP0. There should be exactly 997 messages,
     // since we have chosen 3 already, and we started with 1000.
     (0 until (numEnvelopes - 3)).foreach { i =>
       assertEquals(envelope, consumers.choose)
@@ -296,4 +297,4 @@ class TestSystemConsumers {
     def stop {}
     def register { super.register(systemStreamPartition, "0") }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala
index d7632b4..6d53697 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala
@@ -19,16 +19,18 @@
 
 package org.apache.samza.system.chooser
 
-import org.junit.Assert._
-import org.junit.Test
+import java.util.Arrays
+
+import org.apache.samza.Partition
 import org.apache.samza.system.IncomingMessageEnvelope
-import scala.collection.immutable.Queue
 import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.Partition
+import org.junit.Assert._
+import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
-import java.util.Arrays
+
+import scala.collection.immutable.Queue
 
 @RunWith(value = classOf[Parameterized])
 class TestBatchingChooser(getChooser: (MessageChooser, Int) => MessageChooser) {
@@ -45,9 +47,9 @@ class TestBatchingChooser(getChooser: (MessageChooser, Int) => MessageChooser) {
     chooser.start
     // Make sure start and register are working.
     assertEquals(1, mock.starts)
-    assertEquals(null, mock.registers(envelope1.getSystemStreamPartition))
+    assertNull(mock.registers(envelope1.getSystemStreamPartition))
     assertEquals("", mock.registers(envelope2.getSystemStreamPartition))
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope1)
     assertEquals(envelope1, mock.getEnvelopes.head)
     assertEquals(envelope1, chooser.choose)
@@ -84,11 +86,11 @@ class TestBatchingChooser(getChooser: (MessageChooser, Int) => MessageChooser) {
 }
 
 object TestBatchingChooser {
-  // Test both BatchingChooser and DefaultChooser here. DefaultChooser with 
-  // just batch size defined should behave just like plain vanilla batching 
+  // Test both BatchingChooser and DefaultChooser here. DefaultChooser with
+  // just batch size defined should behave just like plain vanilla batching
   // chooser.
   @Parameters
   def parameters: java.util.Collection[Array[(MessageChooser, Int) => MessageChooser]] = Arrays.asList(
     Array((wrapped: MessageChooser, batchSize: Int) => new BatchingChooser(wrapped, batchSize)),
     Array((wrapped: MessageChooser, batchSize: Int) => new DefaultChooser(wrapped, Some(batchSize))))
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
index 993daa6..3c2693c 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
@@ -19,20 +19,22 @@
 
 package org.apache.samza.system.chooser
 
-import org.junit.Assert._
-import org.junit.Test
+import java.util.Arrays
+
 import org.apache.samza.system.IncomingMessageEnvelope
-import scala.collection.immutable.Queue
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.Partition
+import org.apache.samza.system.SystemStream
 import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.junit.Assert._
+import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
-import java.util.Arrays
+
 import scala.collection.JavaConversions._
-import org.apache.samza.system.SystemStream
+import scala.collection.immutable.Queue
 
 @RunWith(value = classOf[Parameterized])
 class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser) {
@@ -61,7 +63,7 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
     assertEquals("foo", mock.registers(envelope1.getSystemStreamPartition))
     chooser.update(envelope1)
     assertEquals(envelope1, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.stop
     assertEquals(1, mock.stops)
   }
@@ -72,16 +74,16 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
     val metadata = getMetadata(envelope1, "100", Some("123"))
     val chooser = getChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata))
 
-    // Even though envelope1's SSP is registered as a bootstrap stream, since 
-    // 123=123, it should be marked as "caught up" and treated like a normal 
-    // stream. This means that non-bootstrap stream envelope should be allowed 
+    // Even though envelope1's SSP is registered as a bootstrap stream, since
+    // 123=123, it should be marked as "caught up" and treated like a normal
+    // stream. This means that non-bootstrap stream envelope should be allowed
     // to be chosen.
     chooser.register(envelope1.getSystemStreamPartition, "123")
     chooser.register(envelope2.getSystemStreamPartition, "321")
     chooser.start
     chooser.update(envelope2)
     assertEquals(envelope2, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
   }
 
   @Test
@@ -90,40 +92,40 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
     val metadata = getMetadata(envelope1, "123")
     val chooser = getChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata))
 
-    // Even though envelope1's SSP is registered as a bootstrap stream, since 
-    // 123=123, it should be marked as "caught up" and treated like a normal 
-    // stream. This means that non-bootstrap stream envelope should be allowed 
+    // Even though envelope1's SSP is registered as a bootstrap stream, since
+    // 123=123, it should be marked as "caught up" and treated like a normal
+    // stream. This means that non-bootstrap stream envelope should be allowed
     // to be chosen.
     chooser.register(envelope1.getSystemStreamPartition, "1")
     chooser.register(envelope2.getSystemStreamPartition, null)
     chooser.start
     chooser.update(envelope2)
-    // Choose should not return anything since bootstrapper is blocking 
+    // Choose should not return anything since bootstrapper is blocking
     // wrapped.choose until it gets an update from envelope1's SSP.
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope1)
-    // Now that we have an update from the required SSP, the mock chooser 
+    // Now that we have an update from the required SSP, the mock chooser
     // should be called, and return.
     assertEquals(envelope2, chooser.choose)
-    // The chooser still has an envelope from envelope1's SSP, so it should 
+    // The chooser still has an envelope from envelope1's SSP, so it should
     // return.
     assertEquals(envelope1, chooser.choose)
     // No envelope for envelope1's SSP has been given, so it should block.
     chooser.update(envelope2)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     // Now we're giving an envelope with the proper last offset (123), so no
     // envelope1's SSP should be treated no differently than envelope2's.
     chooser.update(envelope4)
     assertEquals(envelope2, chooser.choose)
     assertEquals(envelope4, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     // Should not block here since there are no more lagging bootstrap streams.
     chooser.update(envelope2)
     assertEquals(envelope2, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope2)
     assertEquals(envelope2, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
   }
 
   @Test
@@ -138,54 +140,54 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
     chooser.register(envelope3.getSystemStreamPartition, "1")
     chooser.start
     chooser.update(envelope1)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope3)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope2)
 
     // Fully loaded now.
     assertEquals(envelope1, chooser.choose)
     // Can't pick again because envelope1's SSP is missing.
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope1)
     // Can pick again.
     assertEquals(envelope3, chooser.choose)
     // Can still pick since envelope3.SSP isn't being tracked.
     assertEquals(envelope2, chooser.choose)
     // Can't pick since envelope2.SSP needs an envelope now.
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope2)
     // Now we get envelope1 again.
     assertEquals(envelope1, chooser.choose)
     // Can't pick again.
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     // Now use envelope4, to trigger "all caught up" for envelope1.SSP.
     chooser.update(envelope4)
     // Chooser's contents is currently: e2, e4 (System.err.println(mock.getEnvelopes))
     // Add envelope3, whose SSP isn't being tracked.
     chooser.update(envelope3)
     assertEquals(envelope2, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope2)
     // Chooser's contents is currently: e4, e3, e2 (System.err.println(mock.getEnvelopes))
     assertEquals(envelope4, chooser.choose)
-    // This should be allowed, even though no message from envelope1.SSP is 
-    // available, since envelope4 triggered "all caught up" because its offset 
-    // matches the offset map for this SSP, and we still have an envelope for 
+    // This should be allowed, even though no message from envelope1.SSP is
+    // available, since envelope4 triggered "all caught up" because its offset
+    // matches the offset map for this SSP, and we still have an envelope for
     // envelope2.SSP in the queue.
     assertEquals(envelope3, chooser.choose)
     assertEquals(envelope2, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     // Fin.
   }
 }
 
 object TestBootstrappingChooser {
-  // Test both BatchingChooser and DefaultChooser here. DefaultChooser with 
-  // just batch size defined should behave just like plain vanilla batching 
+  // Test both BatchingChooser and DefaultChooser here. DefaultChooser with
+  // just batch size defined should behave just like plain vanilla batching
   // chooser.
   @Parameters
   def parameters: java.util.Collection[Array[(MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser]] = Arrays.asList(
     Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => new BootstrappingChooser(wrapped, bootstrapStreamMetadata)),
     Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => new DefaultChooser(wrapped, bootstrapStreamMetadata = bootstrapStreamMetadata)))
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
index 884e458..0909956 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
@@ -26,12 +26,13 @@ import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.Partition
 import org.apache.samza.config.MapConfig
-import scala.collection.JavaConversions._
 import org.apache.samza.config.DefaultChooserConfig
 import org.apache.samza.system.SystemStream
 import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 
+import scala.collection.JavaConversions._
+
 class TestDefaultChooser {
   val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1);
   val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), null, null, 2);
@@ -47,7 +48,7 @@ class TestDefaultChooser {
     val mock0 = new MockMessageChooser
     val mock1 = new MockMessageChooser
     val mock2 = new MockMessageChooser
-    // Create metadata for two envelopes (1 and 5) that are part of the same 
+    // Create metadata for two envelopes (1 and 5) that are part of the same
     // stream, but have different partitions and offsets.
     val env1Metadata = new SystemStreamPartitionMetadata(null, "123", null)
     val env5Metadata = new SystemStreamPartitionMetadata(null, "321", null)
@@ -75,28 +76,28 @@ class TestDefaultChooser {
     chooser.register(envelope2.getSystemStreamPartition, null)
     chooser.register(envelope3.getSystemStreamPartition, null)
     chooser.register(envelope5.getSystemStreamPartition, null)
-    // Add a bootstrap stream that's already caught up. If everything is 
+    // Add a bootstrap stream that's already caught up. If everything is
     // working properly, it shouldn't interfere with anything.
     chooser.register(envelope8.getSystemStreamPartition, "654")
     chooser.start
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     // Load with a non-bootstrap stream, and should still get null.
     chooser.update(envelope3)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     // Load with a bootstrap stream, should get that envelope.
     chooser.update(envelope1)
     assertEquals(envelope1, chooser.choose)
 
     // Should block envelope3 since we have no message from envelope1's bootstrap stream.
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     // Load envelope2 from non-bootstrap stream with higher priority than envelope3.
     chooser.update(envelope2)
 
     // Should block envelope2 since we have no message from envelope1's bootstrap stream.
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     // Test batching by giving chooser envelope1 and envelope5, both from same stream, but envelope1 should be preferred partition.
     chooser.update(envelope5)
@@ -107,14 +108,14 @@ class TestDefaultChooser {
     chooser.update(envelope1)
     assertEquals(envelope5, chooser.choose)
     assertEquals(envelope1, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     // Now we're back to just envelope3, envelope2. Let's catch up envelope1's SSP using envelope4's offset.
     chooser.update(envelope4)
     assertEquals(envelope4, chooser.choose)
 
     // Should still block envelopes 1 and 2 because the second partition hasn't caught up yet.
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     // Now catch up the second partition.
     chooser.update(envelope6)
@@ -135,7 +136,7 @@ class TestDefaultChooser {
 
     // Now we should finally get the lowest priority non-bootstrap stream, envelope3.
     assertEquals(envelope3, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
   }
 
   @Test
@@ -166,4 +167,4 @@ class TestDefaultChooser {
 class MockBlockingEnvelopeMap extends BlockingEnvelopeMap {
   def start = Unit
   def stop = Unit
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala
index 01802b9..1329e84 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala
@@ -19,15 +19,16 @@
 
 package org.apache.samza.system.chooser
 
-import org.junit.Assert._
-import org.junit.Test
+import java.util.Arrays
+
 import org.apache.samza.Partition
 import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.SystemStreamPartition
+import org.junit.Assert._
+import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
-import java.util.Arrays
 
 @RunWith(value = classOf[Parameterized])
 class TestRoundRobinChooser(getChooser: () => MessageChooser) {
@@ -43,12 +44,12 @@ class TestRoundRobinChooser(getChooser: () => MessageChooser) {
     chooser.register(envelope3.getSystemStreamPartition, "123")
     chooser.start
 
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     // Test one message.
     chooser.update(envelope1)
     assertEquals(envelope1, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     // Verify simple ordering.
     chooser.update(envelope1)
@@ -58,7 +59,7 @@ class TestRoundRobinChooser(getChooser: () => MessageChooser) {
     assertEquals(envelope1, chooser.choose)
     assertEquals(envelope2, chooser.choose)
     assertEquals(envelope3, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     // Verify mixed ordering.
     chooser.update(envelope2)
@@ -72,7 +73,7 @@ class TestRoundRobinChooser(getChooser: () => MessageChooser) {
 
     assertEquals(envelope1, chooser.choose)
     assertEquals(envelope2, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     // Verify simple ordering with different starting envelope.
     chooser.update(envelope2)
@@ -82,7 +83,7 @@ class TestRoundRobinChooser(getChooser: () => MessageChooser) {
     assertEquals(envelope2, chooser.choose)
     assertEquals(envelope1, chooser.choose)
     assertEquals(envelope3, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
   }
 }
 
@@ -92,4 +93,4 @@ object TestRoundRobinChooser {
   // plain vanilla round robin chooser.
   @Parameters
   def parameters: java.util.Collection[Array[() => MessageChooser]] = Arrays.asList(Array(() => new RoundRobinChooser), Array(() => new DefaultChooser))
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala
index 4cde630..3e435ae 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala
@@ -19,18 +19,20 @@
 
 package org.apache.samza.system.chooser
 
+import java.util.Arrays
+
 import org.junit.Assert._
 import org.junit.Test
-import org.apache.samza.system.IncomingMessageEnvelope
-import scala.collection.immutable.Queue
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.Partition
-import org.apache.samza.SamzaException
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
-import java.util.Arrays
+import org.apache.samza.Partition
+import org.apache.samza.SamzaException
+import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.SystemStream
+import org.apache.samza.system.SystemStreamPartition
+
+import scala.collection.immutable.Queue
 
 @RunWith(value = classOf[Parameterized])
 class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, MessageChooser], MessageChooser) => MessageChooser) {
@@ -68,10 +70,10 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me
 
     chooser.register(envelope1.getSystemStreamPartition, null)
     chooser.start
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope1)
     assertEquals(envelope1, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
   }
 
   @Test
@@ -85,7 +87,7 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me
     // The SSP for envelope2 is not defined as a priority stream.
     chooser.register(envelope2.getSystemStreamPartition, null)
     chooser.start
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     try {
       chooser.update(envelope2)
@@ -106,18 +108,18 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me
     chooser.register(envelope1.getSystemStreamPartition, null)
     chooser.start
 
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope1)
     chooser.update(envelope4)
     assertEquals(envelope1, chooser.choose)
     assertEquals(envelope4, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     chooser.update(envelope4)
     chooser.update(envelope1)
     assertEquals(envelope4, chooser.choose)
     assertEquals(envelope1, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
   }
 
   @Test
@@ -132,18 +134,18 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me
     chooser.register(envelope3.getSystemStreamPartition, null)
     chooser.start
 
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope2)
     chooser.update(envelope3)
     assertEquals(envelope2, chooser.choose)
     assertEquals(envelope3, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     chooser.update(envelope3)
     chooser.update(envelope2)
     assertEquals(envelope3, chooser.choose)
     assertEquals(envelope2, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
   }
 
   @Test
@@ -160,30 +162,30 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me
     chooser.register(envelope2.getSystemStreamPartition, null)
     chooser.start
 
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope1)
     chooser.update(envelope4)
     assertEquals(envelope1, chooser.choose)
     assertEquals(envelope4, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     chooser.update(envelope4)
     chooser.update(envelope1)
     assertEquals(envelope4, chooser.choose)
     assertEquals(envelope1, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     chooser.update(envelope2)
     chooser.update(envelope4)
     assertEquals(envelope2, chooser.choose)
     assertEquals(envelope4, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     chooser.update(envelope1)
     chooser.update(envelope2)
     assertEquals(envelope1, chooser.choose)
     assertEquals(envelope2, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
   }
 
   @Test
@@ -203,18 +205,18 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me
     chooser.register(envelope2.getSystemStreamPartition, null)
     chooser.start
 
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope1)
     chooser.update(envelope4)
     assertEquals(envelope1, chooser.choose)
     assertEquals(envelope4, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     chooser.update(envelope4)
     chooser.update(envelope1)
     assertEquals(envelope4, chooser.choose)
     assertEquals(envelope1, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     chooser.update(envelope2)
     chooser.update(envelope4)
@@ -222,18 +224,18 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me
     // priority.
     assertEquals(envelope4, chooser.choose)
     assertEquals(envelope2, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     chooser.update(envelope1)
     chooser.update(envelope2)
     assertEquals(envelope1, chooser.choose)
     assertEquals(envelope2, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     // Just the low priority stream.
     chooser.update(envelope2)
     assertEquals(envelope2, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
   }
 }
 
@@ -245,4 +247,4 @@ object TestTieredPriorityChooser {
   def parameters: java.util.Collection[Array[(Map[SystemStream, Int], Map[Int, MessageChooser], MessageChooser) => MessageChooser]] = Arrays.asList(
     Array((priorities: Map[SystemStream, Int], choosers: Map[Int, MessageChooser], default: MessageChooser) => new TieredPriorityChooser(priorities, choosers, default)),
     Array((priorities: Map[SystemStream, Int], choosers: Map[Int, MessageChooser], default: MessageChooser) => new DefaultChooser(default, None, priorities, choosers)))
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala
index fb26bfc..525d126 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala
@@ -19,20 +19,22 @@
 
 package org.apache.samza.system.filereader
 
-import org.junit.Assert._
-import scala.collection.JavaConversions._
 import java.io.PrintWriter
 import java.io.File
-import org.scalatest.junit.AssertionsForJUnit
-import org.junit.Test
-import org.junit.Before
-import org.junit.After
 import java.io.RandomAccessFile
+
+import org.apache.samza.SamzaException
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.Partition
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.junit.Assert._
+import org.junit.Test
+import org.junit.Before
+import org.junit.After
+import org.scalatest.junit.AssertionsForJUnit
+
 import scala.collection.mutable.HashMap
-import org.apache.samza.SamzaException
+import scala.collection.JavaConversions._
 
 class TestFileReaderSystemAdmin extends AssertionsForJUnit {
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
index f505eb1..5707bb4 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
@@ -22,12 +22,14 @@ package org.apache.samza.system.filereader
 import java.io.File
 import java.io.FileWriter
 import java.io.PrintWriter
+
 import org.apache.samza.Partition
 import org.apache.samza.system.SystemStreamPartition
 import org.junit.AfterClass
 import org.junit.Assert._
 import org.junit.BeforeClass
 import org.junit.Test
+
 import scala.collection.JavaConversions._
 import scala.collection.mutable.HashMap
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala
index 330df78..c3295f3 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala
@@ -19,11 +19,12 @@
 
 package org.apache.samza.system.filereader
 
+import org.apache.samza.SamzaException
 import org.junit.Assert._
-import scala.collection.JavaConversions._
-import org.scalatest.junit.AssertionsForJUnit
 import org.junit.Test
-import org.apache.samza.SamzaException
+import org.scalatest.junit.AssertionsForJUnit
+
+import scala.collection.JavaConversions._
 
 class TestFileReaderSystemFactory extends AssertionsForJUnit {
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
index 7cfeb5a..c141b5f 100644
--- a/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
@@ -19,10 +19,10 @@
 
 package org.apache.samza.task
 
-import org.junit.Assert._
-import org.junit.Test
 import org.apache.samza.task.TaskCoordinator.RequestScope
 import org.apache.samza.container.TaskName
+import org.junit.Assert._
+import org.junit.Test
 
 class TestReadableCoordinator {
   val taskName = new TaskName("P0")

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala b/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala
index 6353378..ee56e20 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.samza.util
 
 import org.junit.Assert._
@@ -28,9 +29,9 @@ class TestDaemonThreadFactory {
     val dtf = new DaemonThreadFactory(testThreadName)
     val threadWithName = dtf.newThread(new Runnable {
       def run() {
-        //Not testing this particular method
+        // Not testing this particular method
       }
     })
-    assertEquals(threadWithName.getName, ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX+testThreadName)
+    assertEquals(ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX + testThreadName, threadWithName.getName)
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
index 4a561d1..9ba8a4d 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,14 +15,13 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
 
 package org.apache.samza.util
 
+import org.apache.samza.util.ExponentialSleepStrategy.RetryLoop
 import org.junit.Assert._
 import org.junit.Test
-import org.apache.samza.util.ExponentialSleepStrategy.RetryLoop
 
 class TestExponentialSleepStrategy {
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
index 7c314ce..8c21901 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.samza.util
 
 import org.apache.samza.Partition
@@ -31,6 +32,7 @@ import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.util.Util._
 import org.junit.Assert._
 import org.junit.Test
+
 import scala.collection.JavaConversions._
 import scala.util.Random
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
index 7a23041..b76d5ad 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
@@ -18,10 +18,10 @@
  */
 package org.apache.samza.checkpoint.kafka
 
+import org.apache.samza.SamzaException
 import org.apache.samza.container.TaskName
 import org.junit.Assert._
 import org.junit.{Before, Test}
-import org.apache.samza.SamzaException
 
 class TestKafkaCheckpointLogKey {
   @Before

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 4827731..553d6b4 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -19,6 +19,7 @@
 
 package org.apache.samza.checkpoint.kafka
 
+import kafka.admin.AdminUtils
 import kafka.common.InvalidMessageSizeException
 import kafka.common.UnknownTopicOrPartitionException
 import kafka.message.InvalidMessageException
@@ -31,20 +32,20 @@ import kafka.utils.TestZKUtils
 import kafka.utils.Utils
 import kafka.utils.ZKStringSerializer
 import kafka.zk.EmbeddedZookeeper
+
 import org.I0Itec.zkclient.ZkClient
 import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.config.MapConfig
 import org.apache.samza.container.TaskName
+import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
 import org.apache.samza.serializers.CheckpointSerde
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.util.{ ClientUtilTopicMetadataStore, TopicMetadataStore }
 import org.apache.samza.{ SamzaException, Partition }
 import org.junit.Assert._
 import org.junit.{ AfterClass, BeforeClass, Test }
-import scala.collection.JavaConversions._
+
 import scala.collection._
-import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
-import kafka.admin.AdminUtils
-import org.apache.samza.config.MapConfig
 import scala.collection.JavaConversions._
 
 object TestKafkaCheckpointManager {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 468aa3d..8109f73 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -19,15 +19,18 @@
 
 package org.apache.samza.config
 
-import org.junit.Assert._
-import org.junit.Test
 import java.net.URI
 import java.io.File
 import java.util.Properties
-import scala.collection.JavaConversions._
-import org.apache.samza.config.factories.PropertiesConfigFactory
+
 import kafka.consumer.ConsumerConfig
 
+import org.apache.samza.config.factories.PropertiesConfigFactory
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConversions._
+
 class TestKafkaConfig {
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala
index fabae68..5cf82c2 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala
@@ -18,10 +18,12 @@
  */
 
 package org.apache.samza.config
+
+import org.apache.samza.config.KafkaSerdeConfig.Config2KafkaSerde
 import org.junit.Assert._
 import org.junit.Test
+
 import scala.collection.JavaConversions._
-import org.apache.samza.config.KafkaSerdeConfig.Config2KafkaSerde
 
 class TestKafkaSerdeConfig {
   val MAGIC_VAL = "1000"
@@ -33,7 +35,7 @@ class TestKafkaSerdeConfig {
 
   @Test
   def testKafkaConfigurationIsBackwardsCompatible {
-    assert(config.getKafkaEncoder("test").getOrElse("").equals(MAGIC_VAL))
-    assert(config.getKafkaDecoder("test").getOrElse("").equals(MAGIC_VAL))
+    assertEquals(MAGIC_VAL, config.getKafkaEncoder("test").getOrElse(""))
+    assertEquals(MAGIC_VAL, config.getKafkaDecoder("test").getOrElse(""))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
index 77cdbe3..89ced34 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
@@ -19,11 +19,13 @@
 
 package org.apache.samza.config
 
-import org.junit.Test
 import collection.JavaConversions._
+
+import org.apache.samza.SamzaException
 import org.junit.Assert._
+import org.junit.Test
+
 import KafkaConfig._
-import org.apache.samza.SamzaException
 
 class TestRegExTopicGenerator {