You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/09/16 18:29:34 UTC

[2/2] git commit: SAMZA-412; replace assert calls in tests with appropriate JUnit assert methods

SAMZA-412; replace assert calls in tests with appropriate JUnit assert methods


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/811f2897
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/811f2897
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/811f2897

Branch: refs/heads/master
Commit: 811f2897c640c684f4506a6bab7590304de34fca
Parents: cb40a59
Author: David Chen <dc...@linkedin.com>
Authored: Tue Sep 16 09:29:20 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Sep 16 09:29:20 2014 -0700

----------------------------------------------------------------------
 .../org/apache/samza/config/TestConfig.java     |  45 ++--
 .../metrics/TestSlidingTimeWindowReservoir.java |  20 +-
 .../org/apache/samza/metrics/TestTimer.java     |  14 +-
 .../samza/util/TestBlockingEnvelopeMap.java     |   2 +
 .../samza/util/TestNoOpMetricsRegistry.java     |  26 ++-
 ...inglePartitionWithoutOffsetsSystemAdmin.java |  10 +-
 .../samza/checkpoint/TestCheckpointTool.scala   |   3 +-
 .../samza/checkpoint/TestOffsetManager.scala    |  11 +-
 .../factories/TestPropertiesConfigFactory.scala |   4 +-
 .../samza/container/TestSamzaContainer.scala    |  14 +-
 .../samza/container/TestTaskInstance.scala      |  29 +--
 .../TestTaskNamesToSystemStreamPartitions.scala |   6 +-
 .../grouper/stream/GroupByTestBase.scala        |  13 +-
 .../grouper/stream/TestGroupByPartition.scala   |  10 +-
 .../TestGroupBySystemStreamPartition.scala      |  15 +-
 .../org/apache/samza/job/TestJobRunner.scala    |   5 +-
 .../samza/job/TestShellCommandBuilder.scala     |   2 +-
 .../apache/samza/job/local/TestProcessJob.scala |   3 +-
 .../apache/samza/job/local/TestThreadJob.scala  |   3 +-
 .../apache/samza/metrics/TestJmxServer.scala    |  17 +-
 .../metrics/reporter/TestJmxReporter.scala      |  19 +-
 .../samza/serializers/TestByteSerde.scala       |  11 +-
 .../samza/serializers/TestCheckpointSerde.scala |   2 +
 .../samza/serializers/TestIntegerSerde.scala    |   7 +-
 .../samza/serializers/TestStringSerde.scala     |   5 +-
 .../samza/system/TestSystemConsumers.scala      |  19 +-
 .../system/chooser/TestBatchingChooser.scala    |  22 +-
 .../chooser/TestBootstrappingChooser.scala      |  70 +++---
 .../system/chooser/TestDefaultChooser.scala     |  23 +-
 .../system/chooser/TestRoundRobinChooser.scala  |  19 +-
 .../chooser/TestTieredPriorityChooser.scala     |  56 ++---
 .../filereader/TestFileReaderSystemAdmin.scala  |  16 +-
 .../TestFileReaderSystemConsumer.scala          |   2 +
 .../TestFileReaderSystemFactory.scala           |   7 +-
 .../samza/task/TestReadableCoordinator.scala    |   4 +-
 .../samza/util/TestDaemonThreadFactory.scala    |   7 +-
 .../util/TestExponentialSleepStrategy.scala     |   4 +-
 .../scala/org/apache/samza/util/TestUtil.scala  |   2 +
 .../kafka/TestKafkaCheckpointLogKey.scala       |   2 +-
 .../kafka/TestKafkaCheckpointManager.scala      |   9 +-
 .../apache/samza/config/TestKafkaConfig.scala   |  11 +-
 .../samza/config/TestKafkaSerdeConfig.scala     |   8 +-
 .../samza/config/TestRegExTopicGenerator.scala  |   6 +-
 .../samza/serializers/TestKafkaSerde.scala      |   8 +-
 .../samza/system/kafka/TestBrokerProxy.scala    |  26 +--
 .../samza/system/kafka/TestGetOffset.scala      |  16 +-
 .../system/kafka/TestKafkaSystemAdmin.scala     |  37 ++--
 .../system/kafka/TestKafkaSystemConsumer.scala  |  15 +-
 .../system/kafka/TestKafkaSystemFactory.scala   |  11 +-
 .../system/kafka/TestKafkaSystemProducer.scala  |  24 ++-
 .../system/kafka/TestTopicMetadataCache.scala   |  14 +-
 .../samza/storage/kv/TestKeyValueStores.scala   |  16 +-
 .../samza/logging/log4j/TestJmxAppender.java    |   5 +-
 .../samza/serializers/TestJsonSerde.scala       |   4 +-
 .../serializers/TestMetricsSnapshotSerde.scala  |  14 +-
 .../test/integration/TestStatefulTask.scala     |  11 +-
 .../TestSamzaContainerPerformance.scala         |  17 +-
 .../samza/job/yarn/TestSamzaAppMaster.scala     |  25 +--
 .../job/yarn/TestSamzaAppMasterLifecycle.scala  |  16 +-
 .../job/yarn/TestSamzaAppMasterService.scala    |  18 +-
 .../yarn/TestSamzaAppMasterTaskManager.scala    | 214 +++++++++----------
 61 files changed, 587 insertions(+), 487 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-api/src/test/java/org/apache/samza/config/TestConfig.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/config/TestConfig.java b/samza-api/src/test/java/org/apache/samza/config/TestConfig.java
index e701296..b4100c2 100644
--- a/samza-api/src/test/java/org/apache/samza/config/TestConfig.java
+++ b/samza-api/src/test/java/org/apache/samza/config/TestConfig.java
@@ -19,13 +19,18 @@
 
 package org.apache.samza.config;
 
-import org.junit.Assert.* ;
-import org.junit.Test ;
-import java.util.Map ;
-import java.util.HashMap ;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map;
+import java.util.HashMap;
+
+import org.junit.Test;
 
 public class TestConfig {
-  // Utility methods to make it easier to tell the class of a primitive via overloaded args
+  /**
+   * Utility methods to make it easier to tell the class of a primitive via
+   * overloaded args
+   */
   Class getClass(long l) {
     return Long.class ;
   }
@@ -35,25 +40,25 @@ public class TestConfig {
   }
 
   @Test
-  public void testgetShortAndLong(){
-    Map<String, String> m = new HashMap<String, String>() { {
-      put("testkey", "11") ;
-    } } ;
+  public void testgetShortAndLong() {
+    Map<String, String> m = new HashMap<String, String>() {{
+      put("testkey", "11");
+    }};
 
-    MapConfig mc = new MapConfig(m) ;
-    short defaultShort=0 ;
-    long defaultLong=0 ;
+    MapConfig mc = new MapConfig(m);
+    short defaultShort = 0;
+    long defaultLong = 0;
 
-    Class c1 = getClass(mc.getShort("testkey")) ;
-    assert(c1 == Short.class) ;
+    Class c1 = getClass(mc.getShort("testkey"));
+    assertEquals(Short.class, c1);
 
-    Class c2 = getClass(mc.getShort("testkey", defaultShort)) ;
-    assert(c2 == Short.class) ;
+    Class c2 = getClass(mc.getShort("testkey", defaultShort));
+    assertEquals(Short.class, c2);
 
-    Class c3 = getClass(mc.getLong("testkey")) ;
-    assert(c3 == Long.class) ;
+    Class c3 = getClass(mc.getLong("testkey"));
+    assertEquals(Long.class, c3);
 
-    Class c4 = getClass(mc.getLong("testkey", defaultLong)) ;
-    assert(c4 == Long.class) ;
+    Class c4 = getClass(mc.getLong("testkey", defaultLong));
+    assertEquals(Long.class, c4);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java b/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
index eb5043b..d392b32 100644
--- a/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
+++ b/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.metrics;
 
+import static org.mockito.Mockito.*;
 import static org.junit.Assert.*;
 
 import java.util.Arrays;
@@ -26,15 +27,14 @@ import java.util.Arrays;
 import org.apache.samza.util.Clock;
 import org.junit.Test;
 
-import static org.mockito.Mockito.*;
-
 public class TestSlidingTimeWindowReservoir {
 
   private final Clock clock = mock(Clock.class);
 
   @Test
   public void testUpdateSizeSnapshot() {
-    SlidingTimeWindowReservoir slidingTimeWindowReservoir = new SlidingTimeWindowReservoir(300, clock);
+    SlidingTimeWindowReservoir slidingTimeWindowReservoir =
+        new SlidingTimeWindowReservoir(300, clock);
 
     when(clock.currentTimeMillis()).thenReturn(0L);
     slidingTimeWindowReservoir.update(1L);
@@ -49,24 +49,26 @@ public class TestSlidingTimeWindowReservoir {
 
     Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot();
     assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L, 3L)));
-    assertTrue(snapshot.getSize() == 3);
+    assertEquals(3, snapshot.getSize());
   }
 
   @Test
   public void testDuplicateTime() {
-    SlidingTimeWindowReservoir slidingTimeWindowReservoir = new SlidingTimeWindowReservoir(300, clock);
+    SlidingTimeWindowReservoir slidingTimeWindowReservoir =
+        new SlidingTimeWindowReservoir(300, clock);
     when(clock.currentTimeMillis()).thenReturn(0L);
     slidingTimeWindowReservoir.update(1L);
     slidingTimeWindowReservoir.update(2L);
 
     Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot();
     assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L)));
-    assertTrue(snapshot.getSize() == 2);
+    assertEquals(2, snapshot.getSize());
   }
 
   @Test
   public void testRemoveExpiredValues() {
-    SlidingTimeWindowReservoir slidingTimeWindowReservoir = new SlidingTimeWindowReservoir(300, clock);
+    SlidingTimeWindowReservoir slidingTimeWindowReservoir =
+        new SlidingTimeWindowReservoir(300, clock);
     when(clock.currentTimeMillis()).thenReturn(0L);
     slidingTimeWindowReservoir.update(1L);
 
@@ -81,6 +83,6 @@ public class TestSlidingTimeWindowReservoir {
 
     Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot();
     assertTrue(snapshot.getValues().containsAll(Arrays.asList(3L, 4L)));
-    assertTrue(snapshot.getSize() == 2);
+    assertEquals(2, snapshot.getSize());
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
index dcc3cb8..63c183f 100644
--- a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
+++ b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
@@ -28,7 +28,9 @@ import org.junit.Test;
 
 public class TestTimer {
 
-  // mock clock
+  /*
+   * Mock clock
+   */
   private final Clock clock = new Clock() {
     long value = 0;
 
@@ -46,7 +48,7 @@ public class TestTimer {
 
     Snapshot snapshot = timer.getSnapshot();
     assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L)));
-    assertTrue(snapshot.getValues().size() == 2);
+    assertEquals(2, snapshot.getValues().size());
   }
 
   @Test
@@ -58,13 +60,13 @@ public class TestTimer {
 
     Snapshot snapshot = timer.getSnapshot();
     assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L, 3L)));
-    assertTrue(snapshot.getValues().size() == 3);
+    assertEquals(3, snapshot.getValues().size());
 
-    // the time is 500 for update(4L) because getSnapshot calls clock once + 3
+    // The time is 500 for update(4L) because getSnapshot calls clock once + 3
     // updates that call clock 3 times
     timer.update(4L);
     Snapshot snapshot2 = timer.getSnapshot();
     assertTrue(snapshot2.getValues().containsAll(Arrays.asList(3L, 4L)));
-    assertTrue(snapshot2.getValues().size() == 2);
+    assertEquals(2, snapshot2.getValues().size());
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
index 35ba52d..4eb87eb 100644
--- a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
+++ b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
@@ -22,6 +22,7 @@ package org.apache.samza.util;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -30,6 +31,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.samza.Partition;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStreamPartition;

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java b/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java
index 2d0034f..1d1e3c5 100644
--- a/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java
+++ b/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java
@@ -33,32 +33,38 @@ public class TestNoOpMetricsRegistry {
     Counter counter1 = registry.newCounter("testc", "a");
     Counter counter2 = registry.newCounter("testc", "b");
     Counter counter3 = registry.newCounter("testc2", "c");
+
     Gauge<String> gauge1 = registry.newGauge("testg", "a", "1");
     Gauge<String> gauge2 = registry.newGauge("testg", "b", "2");
     Gauge<String> gauge3 = registry.newGauge("testg", "c", "3");
     Gauge<String> gauge4 = registry.newGauge("testg2", "d", "4");
+
     Timer timer1 = registry.newTimer("testt", "a");
     Timer timer2 = registry.newTimer("testt", "b");
     Timer timer3 = registry.newTimer("testt2", "c");
+
     counter1.inc();
     counter2.inc(2);
     counter3.inc(4);
+
     gauge1.set("5");
     gauge2.set("6");
     gauge3.set("7");
     gauge4.set("8");
+
     timer1.update(1L);
     timer2.update(2L);
     timer3.update(3L);
-    assertEquals(counter1.getCount(), 1);
-    assertEquals(counter2.getCount(), 2);
-    assertEquals(counter3.getCount(), 4);
-    assertEquals(gauge1.getValue(), "5");
-    assertEquals(gauge2.getValue(), "6");
-    assertEquals(gauge3.getValue(), "7");
-    assertEquals(gauge4.getValue(), "8");
-    assertEquals(timer1.getSnapshot().getAverage(), 1, 0);
-    assertEquals(timer2.getSnapshot().getAverage(), 2, 0);
-    assertEquals(timer3.getSnapshot().getAverage(), 3, 0);
+
+    assertEquals(1, counter1.getCount());
+    assertEquals(2, counter2.getCount());
+    assertEquals(4, counter3.getCount());
+    assertEquals("5", gauge1.getValue());
+    assertEquals("6", gauge2.getValue());
+    assertEquals("7", gauge3.getValue());
+    assertEquals("8", gauge4.getValue());
+    assertEquals(1, timer1.getSnapshot().getAverage(), 0);
+    assertEquals(2, timer2.getSnapshot().getAverage(), 0);
+    assertEquals(3, timer3.getSnapshot().getAverage(), 0);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-api/src/test/java/org/apache/samza/util/TestSinglePartitionWithoutOffsetsSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/util/TestSinglePartitionWithoutOffsetsSystemAdmin.java b/samza-api/src/test/java/org/apache/samza/util/TestSinglePartitionWithoutOffsetsSystemAdmin.java
index 4166493..025f0a6 100644
--- a/samza-api/src/test/java/org/apache/samza/util/TestSinglePartitionWithoutOffsetsSystemAdmin.java
+++ b/samza-api/src/test/java/org/apache/samza/util/TestSinglePartitionWithoutOffsetsSystemAdmin.java
@@ -20,10 +20,12 @@
 package org.apache.samza.util;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+
 import org.apache.samza.Partition;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.SystemStreamMetadata;
@@ -36,12 +38,14 @@ public class TestSinglePartitionWithoutOffsetsSystemAdmin {
     Set<String> streamNames = new HashSet<String>();
     streamNames.add("a");
     streamNames.add("b");
+
     Map<String, SystemStreamMetadata> metadata = admin.getSystemStreamMetadata(streamNames);
-    assertEquals(metadata.size(), 2);
+    assertEquals(2, metadata.size());
     SystemStreamMetadata metadata1 = metadata.get("a");
     SystemStreamMetadata metadata2 = metadata.get("b");
+
     assertEquals(1, metadata1.getSystemStreamPartitionMetadata().size());
     assertEquals(1, metadata2.getSystemStreamPartitionMetadata().size());
-    assertEquals(null, metadata.get(new SystemStreamPartition("test-system", "c", new Partition(0))));
+    assertNull(metadata.get(new SystemStreamPartition("test-system", "c", new Partition(0))));
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index 1eb3995..af800df 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -20,6 +20,7 @@
 package org.apache.samza.checkpoint
 
 import org.apache.samza.Partition
+import org.apache.samza.container.TaskName
 import org.apache.samza.checkpoint.TestCheckpointTool.{MockCheckpointManagerFactory, MockSystemFactory}
 import org.apache.samza.config.{Config, MapConfig, SystemConfig, TaskConfig}
 import org.apache.samza.metrics.MetricsRegistry
@@ -30,8 +31,8 @@ import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest.junit.AssertionsForJUnit
 import org.scalatest.mock.MockitoSugar
+
 import scala.collection.JavaConversions._
-import org.apache.samza.container.TaskName
 
 object TestCheckpointTool {
   var checkpointManager: CheckpointManager = null

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index 44a98a5..a79ecca 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -19,7 +19,9 @@
 
 package org.apache.samza.checkpoint
 
-import scala.collection.JavaConversions._
+import java.util
+
+import org.apache.samza.container.TaskName
 import org.apache.samza.Partition
 import org.apache.samza.system.SystemStream
 import org.apache.samza.system.SystemStreamMetadata
@@ -30,10 +32,10 @@ import org.junit.{Ignore, Test}
 import org.apache.samza.SamzaException
 import org.apache.samza.config.MapConfig
 import org.apache.samza.system.SystemAdmin
-import java.util
-import org.apache.samza.container.TaskName
 import org.scalatest.Assertions.intercept
 
+import scala.collection.JavaConversions._
+
 class TestOffsetManager {
   @Test
   def testSystemShouldUseDefaults {
@@ -47,7 +49,7 @@ class TestOffsetManager {
     val offsetManager = OffsetManager(systemStreamMetadata, config)
     offsetManager.register(taskName, Set(systemStreamPartition))
     offsetManager.start
-    assertTrue(!offsetManager.getLastProcessedOffset(systemStreamPartition).isDefined)
+    assertFalse(offsetManager.getLastProcessedOffset(systemStreamPartition).isDefined)
     assertTrue(offsetManager.getStartingOffset(systemStreamPartition).isDefined)
     assertEquals("0", offsetManager.getStartingOffset(systemStreamPartition).get)
   }
@@ -232,7 +234,6 @@ class TestOffsetManager {
       override def writeChangeLogPartitionMapping(mapping: util.Map[TaskName, java.lang.Integer]): Unit = taskNameToPartitionMapping = mapping
 
       override def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = taskNameToPartitionMapping
-
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala b/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala
index f254741..9688abb 100644
--- a/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala
+++ b/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala
@@ -18,8 +18,10 @@
  */
 
 package org.apache.samza.config.factories
+
 import java.net.URI
 import java.io.File
+
 import org.apache.samza.SamzaException
 import org.junit.Assert._
 import org.junit.Test
@@ -30,7 +32,7 @@ class TestPropertiesConfigFactory {
   @Test
   def testCanReadPropertiesConfigFiles {
     val config = factory.getConfig(URI.create("file://%s/src/test/resources/test.properties" format new File(".").getCanonicalPath))
-    assert("bar".equals(config.get("foo")))
+    assertEquals("bar", config.get("foo"))
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 8a04a8a..b7a9569 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -24,26 +24,26 @@ import org.junit.Assert._
 import org.junit.Test
 import org.apache.samza.Partition
 import org.apache.samza.config.MapConfig
+import org.apache.samza.metrics.JmxServer
+import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.SystemConsumers
-import org.apache.samza.system.chooser.RoundRobinChooser
 import org.apache.samza.system.SystemConsumer
 import org.apache.samza.system.SystemProducers
 import org.apache.samza.system.SystemProducer
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.StreamMetadataCache
+import org.apache.samza.system.chooser.RoundRobinChooser
 import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.task.StreamTask
 import org.apache.samza.task.MessageCollector
-import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.task.TaskCoordinator
 import org.apache.samza.task.InitableTask
 import org.apache.samza.task.TaskContext
 import org.apache.samza.task.ClosableTask
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.StreamMetadataCache
 import org.apache.samza.task.TaskInstanceCollector
+import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
 import org.scalatest.junit.AssertionsForJUnit
-import org.apache.samza.metrics.JmxServer
 
 class TestSamzaContainer extends AssertionsForJUnit {
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index be53373..c31a74e 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -21,27 +21,28 @@ package org.apache.samza.container
 
 import org.junit.Assert._
 import org.junit.Test
-import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.system.SystemProducers
-import org.apache.samza.task.MessageCollector
-import org.apache.samza.task.StreamTask
-import org.apache.samza.system.SystemConsumers
-import org.apache.samza.task.TaskCoordinator
-import org.apache.samza.config.MapConfig
 import org.apache.samza.Partition
-import org.apache.samza.system.chooser.RoundRobinChooser
-import org.apache.samza.system.SystemProducer
+import org.apache.samza.checkpoint.OffsetManager
+import org.apache.samza.config.MapConfig
 import org.apache.samza.serializers.SerdeManager
+import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.SystemConsumer
+import org.apache.samza.system.SystemConsumers
+import org.apache.samza.system.SystemProducer
+import org.apache.samza.system.SystemProducers
 import org.apache.samza.system.SystemStream
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.task.ReadableCoordinator
-import org.apache.samza.checkpoint.OffsetManager
 import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import scala.collection.JavaConversions._
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.chooser.RoundRobinChooser
+import org.apache.samza.task.MessageCollector
+import org.apache.samza.task.ReadableCoordinator
+import org.apache.samza.task.StreamTask
+import org.apache.samza.task.TaskCoordinator
 import org.apache.samza.task.TaskInstanceCollector
 
+import scala.collection.JavaConversions._
+
 class TestTaskInstance {
   @Test
   def testOffsetsAreUpdatedOnProcess {
@@ -80,4 +81,4 @@ class TestTaskInstance {
     assertTrue(lastProcessedOffset.isDefined)
     assertEquals("2", lastProcessedOffset.get)
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala
index d680b20..9a3406e 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala
@@ -18,10 +18,10 @@
  */
 package org.apache.samza.container
 
-import org.junit.Test
-import org.junit.Assert._
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.{SamzaException, Partition}
+import org.junit.Test
+import org.junit.Assert._
 
 class TestTaskNamesToSystemStreamPartitions {
   var sspCounter = 0
@@ -36,7 +36,7 @@ class TestTaskNamesToSystemStreamPartitions {
     val asSet = tntssp.toSet
     val expected = Set(new TaskName("tn1") -> Set(makeSSP("tn1-1"), makeSSP("tn1-2")),
                       (new TaskName("tn2") -> Set(makeSSP("tn2-1"), makeSSP("tn2-2"))))
-    assertEquals(expected , asSet)
+    assertEquals(expected, asSet)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
index 47d716e..a14169b 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
@@ -18,15 +18,16 @@
  */
 package org.apache.samza.container.grouper.stream
 
-import org.apache.samza.Partition
-import org.apache.samza.system.SystemStreamPartition
-import org.junit.Test
+import java.util.Collections
 import java.util.HashSet
 import java.util.Map
 import java.util.Set
-import org.junit.Assert._
-import java.util.Collections
+
+import org.apache.samza.Partition
 import org.apache.samza.container.TaskName
+import org.apache.samza.system.SystemStreamPartition
+import org.junit.Test
+import org.junit.Assert._
 
 object GroupByTestBase {
   val aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0))
@@ -54,4 +55,4 @@ abstract class GroupByTestBase {
     val result: Map[TaskName, Set[SystemStreamPartition]] = grouper.group(input)
     assertEquals(output, result)
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
index 2fa718c..74daf72 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
@@ -19,15 +19,17 @@
 package org.apache.samza.container.grouper.stream
 
 import org.apache.samza.container.TaskName
-import scala.collection.JavaConverters._
 import org.junit.Test
 
+import scala.collection.JavaConverters._
+
 class TestGroupByPartition extends GroupByTestBase {
   import GroupByTestBase._
 
-  val expected /* from base class provided set */ =  Map(new TaskName("Partition 0") -> Set(aa0, ac0).asJava,
-                                                         new TaskName("Partition 1") -> Set(aa1, ab1).asJava,
-                                                         new TaskName("Partition 2") -> Set(aa2, ab2).asJava).asJava
+  // from base class provided set
+  val expected = Map(new TaskName("Partition 0") -> Set(aa0, ac0).asJava,
+                     new TaskName("Partition 1") -> Set(aa1, ab1).asJava,
+                     new TaskName("Partition 2") -> Set(aa2, ab2).asJava).asJava
 
   override def getGrouper: SystemStreamPartitionGrouper = new GroupByPartition
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
index 8da0595..deb3895 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
@@ -19,19 +19,20 @@
 package org.apache.samza.container.grouper.stream
 
 import org.apache.samza.container.TaskName
-import scala.collection.JavaConverters._
 import org.junit.Test
+import scala.collection.JavaConverters._
 
 class TestGroupBySystemStreamPartition extends GroupByTestBase {
   import GroupByTestBase._
 
   // Building manually to avoid just duplicating a logic potential logic error here and there
-  val expected /* from base class provided set */ =  Map(new TaskName(aa0.toString) -> Set(aa0).asJava,
-    new TaskName(aa1.toString) -> Set(aa1).asJava,
-    new TaskName(aa2.toString) -> Set(aa2).asJava,
-    new TaskName(ab1.toString) -> Set(ab1).asJava,
-    new TaskName(ab2.toString) -> Set(ab2).asJava,
-    new TaskName(ac0.toString) -> Set(ac0).asJava).asJava
+  // From base class provided set
+  val expected = Map(new TaskName(aa0.toString) -> Set(aa0).asJava,
+                     new TaskName(aa1.toString) -> Set(aa1).asJava,
+                     new TaskName(aa2.toString) -> Set(aa2).asJava,
+                     new TaskName(ab1.toString) -> Set(ab1).asJava,
+                     new TaskName(ab2.toString) -> Set(ab2).asJava,
+                     new TaskName(ac0.toString) -> Set(ac0).asJava).asJava
 
   override def getGrouper: SystemStreamPartitionGrouper = new GroupBySystemStreamPartition
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
index 258ccc1..52057ed 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
@@ -18,9 +18,12 @@
  */
 
 package org.apache.samza.job
+
 import java.io.File
+
 import org.apache.samza.config.Config
 import org.junit.Test
+import org.junit.Assert._
 
 object TestJobRunner {
   var processCount = 0
@@ -34,7 +37,7 @@ class TestJobRunner {
       "org.apache.samza.config.factories.PropertiesConfigFactory",
       "--config-path",
       "file://%s/src/test/resources/test.properties" format new File(".").getCanonicalPath))
-    assert(TestJobRunner.processCount == 1)
+    assertEquals(1, TestJobRunner.processCount)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala b/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
index f8a535a..b186ec1 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
@@ -18,12 +18,12 @@
  */
 package org.apache.samza.job
 
-import org.junit.Test
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.Partition
 import org.apache.samza.util.Util._
 import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions}
 import org.junit.Assert._
+import org.junit.Test
 
 class TestShellCommandBuilder {
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
index d56024d..7f3ccfe 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
@@ -18,6 +18,7 @@
  */
 
 package org.apache.samza.job.local;
+
 import org.junit.Assert._
 import org.junit.Test
 import org.apache.samza.job.ApplicationStatus
@@ -39,6 +40,6 @@ class TestProcessJob {
     job.waitForFinish(500)
     job.kill
     job.waitForFinish(999999)
-    assert(ApplicationStatus.UnsuccessfulFinish.equals(job.waitForFinish(999999999)))
+    assertEquals(ApplicationStatus.UnsuccessfulFinish, job.waitForFinish(999999999))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
index 7d45889..4f3f511 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
@@ -18,6 +18,7 @@
  */
 
 package org.apache.samza.job.local
+
 import org.junit.Assert._
 import org.junit.Test
 import org.apache.samza.job.ApplicationStatus
@@ -44,6 +45,6 @@ class TestThreadJob {
     job.waitForFinish(500)
     job.kill
     job.waitForFinish(999999)
-    assert(ApplicationStatus.UnsuccessfulFinish.equals(job.waitForFinish(999999999)))
+    assertEquals(ApplicationStatus.UnsuccessfulFinish, job.waitForFinish(999999999))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala b/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
index f01117d..f49cfaa 100644
--- a/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
@@ -22,14 +22,15 @@ package org.apache.samza.metrics
 import org.junit.Assert._
 import org.junit.Test
 import org.apache.samza.util.Logging
-import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL}
+
 import java.io.IOException
 
+import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL}
 
 class TestJmxServer extends Logging {
   @Test
   def serverStartsUp {
-    var jmxServer:JmxServer = null
+    var jmxServer: JmxServer = null
 
     try {
       jmxServer = new JmxServer
@@ -45,14 +46,16 @@ class TestJmxServer extends Logging {
         assertTrue("Connected but mbean count is somehow 0", connection.getMBeanCount.intValue() > 0)
       } catch {
         case ioe:IOException => fail("Couldn't open connection to local JMX server")
-      }finally {
-        if(jmxConnector != null) jmxConnector.close
+      } finally {
+        if (jmxConnector != null) {
+          jmxConnector.close
+        }
       }
 
     } finally {
-      if (jmxServer != null) jmxServer.stop
+      if (jmxServer != null) {
+        jmxServer.stop
+      }
     }
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala b/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
index f6c8646..3cfd439 100644
--- a/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
+++ b/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
@@ -23,20 +23,23 @@ import org.junit.Assert._
 import org.junit.AfterClass
 import org.junit.BeforeClass
 import org.junit.Test
-import scala.collection.JavaConversions._
 import org.apache.samza.task.TaskContext
-import javax.management.remote.JMXConnectorFactory
 import org.apache.samza.metrics.MetricsRegistryMap
-import javax.management.remote.JMXConnectorServerFactory
-import javax.management.remote.JMXConnectorServer
-import java.rmi.registry.LocateRegistry
-import javax.management.remote.JMXServiceURL
 import org.apache.samza.config.MapConfig
-import java.lang.management.ManagementFactory
 import org.apache.samza.Partition
-import javax.management.ObjectName
 import org.apache.samza.metrics.JvmMetrics
 
+import java.lang.management.ManagementFactory
+import java.rmi.registry.LocateRegistry
+
+import javax.management.ObjectName
+import javax.management.remote.JMXServiceURL
+import javax.management.remote.JMXConnectorServerFactory
+import javax.management.remote.JMXConnectorServer
+import javax.management.remote.JMXConnectorFactory
+
+import scala.collection.JavaConversions._
+
 object TestJmxReporter {
   val port = 4500
   val url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:%d/jmxapitestrmi" format port)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala
index f64c263..f605762 100644
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala
@@ -19,19 +19,20 @@
 
 package org.apache.samza.serializers
 
+import java.util.Arrays
+
 import org.junit.Assert._
 import org.junit.Test
-import java.util.Arrays
 
 class TestByteSerde {
   @Test
   def testByteSerde {
     val serde = new ByteSerde
-    assertEquals(null, serde.toBytes(null))
-    assertEquals(null, serde.fromBytes(null))
+    assertNull(serde.toBytes(null))
+    assertNull(serde.fromBytes(null))
 
     val testBytes = "A lazy way of creating a byte array".getBytes()
-    assertTrue(Arrays.equals(serde.toBytes(testBytes), testBytes))
-    assertTrue( Arrays.equals(serde.fromBytes(testBytes), testBytes))
+    assertArrayEquals(serde.toBytes(testBytes), testBytes)
+    assertArrayEquals(serde.fromBytes(testBytes), testBytes)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
index 0d07314..3d0a603 100644
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
@@ -20,12 +20,14 @@
 package org.apache.samza.serializers
 
 import java.util
+
 import org.apache.samza.Partition
 import org.apache.samza.checkpoint.Checkpoint
 import org.apache.samza.container.TaskName
 import org.apache.samza.system.SystemStreamPartition
 import org.junit.Assert._
 import org.junit.Test
+
 import scala.collection.JavaConversions._
 
 class TestCheckpointSerde {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala
index 45a2b04..ad646d7 100644
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala
@@ -19,9 +19,10 @@
 
 package org.apache.samza.serializers
 
+import java.util.Arrays
+
 import org.junit.Assert._
 import org.junit.Test
-import java.util.Arrays
 
 class TestIntegerSerde {
   @Test
@@ -33,7 +34,7 @@ class TestIntegerSerde {
     val fooBar = 37
     val fooBarBytes = serde.toBytes(fooBar)
     fooBarBytes.foreach(System.err.println)
-    assertTrue(Arrays.equals(Array[Byte](0, 0, 0, 37), fooBarBytes))
+    assertArrayEquals(Array[Byte](0, 0, 0, 37), fooBarBytes)
     assertEquals(fooBar, serde.fromBytes(fooBarBytes))
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala
index 7fbf0c2..a1e8e88 100644
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala
@@ -21,7 +21,6 @@ package org.apache.samza.serializers
 
 import org.junit.Assert._
 import org.junit.Test
-import java.util.Arrays
 
 class TestStringSerde {
   @Test
@@ -32,7 +31,7 @@ class TestStringSerde {
 
     val fooBar = "foo bar"
     val fooBarBytes = serde.toBytes(fooBar)
-    assertTrue(Arrays.equals(fooBar.getBytes("UTF-8"), fooBarBytes))
+    assertArrayEquals(fooBar.getBytes("UTF-8"), fooBarBytes)
     assertEquals(fooBar, serde.fromBytes(fooBarBytes))
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
index 04229a6..3fdc781 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -19,15 +19,16 @@
 
 package org.apache.samza.system
 
-import scala.collection.JavaConversions._
-import org.apache.samza.Partition
 import org.junit.Assert._
 import org.junit.Test
+import org.apache.samza.Partition
+import org.apache.samza.serializers._
 import org.apache.samza.system.chooser.MessageChooser
 import org.apache.samza.system.chooser.DefaultChooser
-import org.apache.samza.util.BlockingEnvelopeMap
-import org.apache.samza.serializers._
 import org.apache.samza.system.chooser.MockMessageChooser
+import org.apache.samza.util.BlockingEnvelopeMap
+
+import scala.collection.JavaConversions._
 
 class TestSystemConsumers {
   def testPollIntervalMs {
@@ -44,7 +45,7 @@ class TestSystemConsumers {
     consumers.register(systemStreamPartition1, "1234")
     consumers.start
 
-    // Tell the consumer to respond with 1000 messages for SSP0, and no 
+    // Tell the consumer to respond with 1000 messages for SSP0, and no
     // messages for SSP1.
     consumer.setResponseSizes(numEnvelopes)
 
@@ -60,13 +61,13 @@ class TestSystemConsumers {
     // We aren't polling because we're getting non-null envelopes.
     assertEquals(2, consumer.polls)
 
-    // Advance the clock to trigger a new poll even though there are still 
+    // Advance the clock to trigger a new poll even though there are still
     // messages.
     now = SystemConsumers.DEFAULT_POLL_INTERVAL_MS
 
     assertEquals(envelope, consumers.choose)
 
-    // We polled even though there are still 997 messages in the unprocessed 
+    // We polled even though there are still 997 messages in the unprocessed
     // message buffer.
     assertEquals(3, consumer.polls)
     assertEquals(1, consumer.lastPoll.size)
@@ -74,7 +75,7 @@ class TestSystemConsumers {
     // Only SSP1 was polled because we still have messages for SSP2.
     assertTrue(consumer.lastPoll.contains(systemStreamPartition1))
 
-    // Now drain all messages for SSP0. There should be exactly 997 messages, 
+    // Now drain all messages for SSP0. There should be exactly 997 messages,
     // since we have chosen 3 already, and we started with 1000.
     (0 until (numEnvelopes - 3)).foreach { i =>
       assertEquals(envelope, consumers.choose)
@@ -296,4 +297,4 @@ class TestSystemConsumers {
     def stop {}
     def register { super.register(systemStreamPartition, "0") }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala
index d7632b4..6d53697 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala
@@ -19,16 +19,18 @@
 
 package org.apache.samza.system.chooser
 
-import org.junit.Assert._
-import org.junit.Test
+import java.util.Arrays
+
+import org.apache.samza.Partition
 import org.apache.samza.system.IncomingMessageEnvelope
-import scala.collection.immutable.Queue
 import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.Partition
+import org.junit.Assert._
+import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
-import java.util.Arrays
+
+import scala.collection.immutable.Queue
 
 @RunWith(value = classOf[Parameterized])
 class TestBatchingChooser(getChooser: (MessageChooser, Int) => MessageChooser) {
@@ -45,9 +47,9 @@ class TestBatchingChooser(getChooser: (MessageChooser, Int) => MessageChooser) {
     chooser.start
     // Make sure start and register are working.
     assertEquals(1, mock.starts)
-    assertEquals(null, mock.registers(envelope1.getSystemStreamPartition))
+    assertNull(mock.registers(envelope1.getSystemStreamPartition))
     assertEquals("", mock.registers(envelope2.getSystemStreamPartition))
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope1)
     assertEquals(envelope1, mock.getEnvelopes.head)
     assertEquals(envelope1, chooser.choose)
@@ -84,11 +86,11 @@ class TestBatchingChooser(getChooser: (MessageChooser, Int) => MessageChooser) {
 }
 
 object TestBatchingChooser {
-  // Test both BatchingChooser and DefaultChooser here. DefaultChooser with 
-  // just batch size defined should behave just like plain vanilla batching 
+  // Test both BatchingChooser and DefaultChooser here. DefaultChooser with
+  // just batch size defined should behave just like plain vanilla batching
   // chooser.
   @Parameters
   def parameters: java.util.Collection[Array[(MessageChooser, Int) => MessageChooser]] = Arrays.asList(
     Array((wrapped: MessageChooser, batchSize: Int) => new BatchingChooser(wrapped, batchSize)),
     Array((wrapped: MessageChooser, batchSize: Int) => new DefaultChooser(wrapped, Some(batchSize))))
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
index 993daa6..3c2693c 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
@@ -19,20 +19,22 @@
 
 package org.apache.samza.system.chooser
 
-import org.junit.Assert._
-import org.junit.Test
+import java.util.Arrays
+
 import org.apache.samza.system.IncomingMessageEnvelope
-import scala.collection.immutable.Queue
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.Partition
+import org.apache.samza.system.SystemStream
 import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.junit.Assert._
+import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
-import java.util.Arrays
+
 import scala.collection.JavaConversions._
-import org.apache.samza.system.SystemStream
+import scala.collection.immutable.Queue
 
 @RunWith(value = classOf[Parameterized])
 class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser) {
@@ -61,7 +63,7 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
     assertEquals("foo", mock.registers(envelope1.getSystemStreamPartition))
     chooser.update(envelope1)
     assertEquals(envelope1, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.stop
     assertEquals(1, mock.stops)
   }
@@ -72,16 +74,16 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
     val metadata = getMetadata(envelope1, "100", Some("123"))
     val chooser = getChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata))
 
-    // Even though envelope1's SSP is registered as a bootstrap stream, since 
-    // 123=123, it should be marked as "caught up" and treated like a normal 
-    // stream. This means that non-bootstrap stream envelope should be allowed 
+    // Even though envelope1's SSP is registered as a bootstrap stream, since
+    // 123=123, it should be marked as "caught up" and treated like a normal
+    // stream. This means that non-bootstrap stream envelope should be allowed
     // to be chosen.
     chooser.register(envelope1.getSystemStreamPartition, "123")
     chooser.register(envelope2.getSystemStreamPartition, "321")
     chooser.start
     chooser.update(envelope2)
     assertEquals(envelope2, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
   }
 
   @Test
@@ -90,40 +92,40 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
     val metadata = getMetadata(envelope1, "123")
     val chooser = getChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata))
 
-    // Even though envelope1's SSP is registered as a bootstrap stream, since 
-    // 123=123, it should be marked as "caught up" and treated like a normal 
-    // stream. This means that non-bootstrap stream envelope should be allowed 
+    // Even though envelope1's SSP is registered as a bootstrap stream, since
+    // 123=123, it should be marked as "caught up" and treated like a normal
+    // stream. This means that non-bootstrap stream envelope should be allowed
     // to be chosen.
     chooser.register(envelope1.getSystemStreamPartition, "1")
     chooser.register(envelope2.getSystemStreamPartition, null)
     chooser.start
     chooser.update(envelope2)
-    // Choose should not return anything since bootstrapper is blocking 
+    // Choose should not return anything since bootstrapper is blocking
     // wrapped.choose until it gets an update from envelope1's SSP.
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope1)
-    // Now that we have an update from the required SSP, the mock chooser 
+    // Now that we have an update from the required SSP, the mock chooser
     // should be called, and return.
     assertEquals(envelope2, chooser.choose)
-    // The chooser still has an envelope from envelope1's SSP, so it should 
+    // The chooser still has an envelope from envelope1's SSP, so it should
     // return.
     assertEquals(envelope1, chooser.choose)
     // No envelope for envelope1's SSP has been given, so it should block.
     chooser.update(envelope2)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     // Now we're giving an envelope with the proper last offset (123), so no
     // envelope1's SSP should be treated no differently than envelope2's.
     chooser.update(envelope4)
     assertEquals(envelope2, chooser.choose)
     assertEquals(envelope4, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     // Should not block here since there are no more lagging bootstrap streams.
     chooser.update(envelope2)
     assertEquals(envelope2, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope2)
     assertEquals(envelope2, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
   }
 
   @Test
@@ -138,54 +140,54 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
     chooser.register(envelope3.getSystemStreamPartition, "1")
     chooser.start
     chooser.update(envelope1)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope3)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope2)
 
     // Fully loaded now.
     assertEquals(envelope1, chooser.choose)
     // Can't pick again because envelope1's SSP is missing.
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope1)
     // Can pick again.
     assertEquals(envelope3, chooser.choose)
     // Can still pick since envelope3.SSP isn't being tracked.
     assertEquals(envelope2, chooser.choose)
     // Can't pick since envelope2.SSP needs an envelope now.
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope2)
     // Now we get envelope1 again.
     assertEquals(envelope1, chooser.choose)
     // Can't pick again.
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     // Now use envelope4, to trigger "all caught up" for envelope1.SSP.
     chooser.update(envelope4)
     // Chooser's contents is currently: e2, e4 (System.err.println(mock.getEnvelopes))
     // Add envelope3, whose SSP isn't being tracked.
     chooser.update(envelope3)
     assertEquals(envelope2, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope2)
     // Chooser's contents is currently: e4, e3, e2 (System.err.println(mock.getEnvelopes))
     assertEquals(envelope4, chooser.choose)
-    // This should be allowed, even though no message from envelope1.SSP is 
-    // available, since envelope4 triggered "all caught up" because its offset 
-    // matches the offset map for this SSP, and we still have an envelope for 
+    // This should be allowed, even though no message from envelope1.SSP is
+    // available, since envelope4 triggered "all caught up" because its offset
+    // matches the offset map for this SSP, and we still have an envelope for
     // envelope2.SSP in the queue.
     assertEquals(envelope3, chooser.choose)
     assertEquals(envelope2, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     // Fin.
   }
 }
 
 object TestBootstrappingChooser {
-  // Test both BatchingChooser and DefaultChooser here. DefaultChooser with 
-  // just batch size defined should behave just like plain vanilla batching 
+  // Test both BatchingChooser and DefaultChooser here. DefaultChooser with
+  // just batch size defined should behave just like plain vanilla batching
   // chooser.
   @Parameters
   def parameters: java.util.Collection[Array[(MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser]] = Arrays.asList(
     Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => new BootstrappingChooser(wrapped, bootstrapStreamMetadata)),
     Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => new DefaultChooser(wrapped, bootstrapStreamMetadata = bootstrapStreamMetadata)))
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
index 884e458..0909956 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
@@ -26,12 +26,13 @@ import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.Partition
 import org.apache.samza.config.MapConfig
-import scala.collection.JavaConversions._
 import org.apache.samza.config.DefaultChooserConfig
 import org.apache.samza.system.SystemStream
 import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 
+import scala.collection.JavaConversions._
+
 class TestDefaultChooser {
   val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1);
   val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), null, null, 2);
@@ -47,7 +48,7 @@ class TestDefaultChooser {
     val mock0 = new MockMessageChooser
     val mock1 = new MockMessageChooser
     val mock2 = new MockMessageChooser
-    // Create metadata for two envelopes (1 and 5) that are part of the same 
+    // Create metadata for two envelopes (1 and 5) that are part of the same
     // stream, but have different partitions and offsets.
     val env1Metadata = new SystemStreamPartitionMetadata(null, "123", null)
     val env5Metadata = new SystemStreamPartitionMetadata(null, "321", null)
@@ -75,28 +76,28 @@ class TestDefaultChooser {
     chooser.register(envelope2.getSystemStreamPartition, null)
     chooser.register(envelope3.getSystemStreamPartition, null)
     chooser.register(envelope5.getSystemStreamPartition, null)
-    // Add a bootstrap stream that's already caught up. If everything is 
+    // Add a bootstrap stream that's already caught up. If everything is
     // working properly, it shouldn't interfere with anything.
     chooser.register(envelope8.getSystemStreamPartition, "654")
     chooser.start
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     // Load with a non-bootstrap stream, and should still get null.
     chooser.update(envelope3)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     // Load with a bootstrap stream, should get that envelope.
     chooser.update(envelope1)
     assertEquals(envelope1, chooser.choose)
 
     // Should block envelope3 since we have no message from envelope1's bootstrap stream.
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     // Load envelope2 from non-bootstrap stream with higher priority than envelope3.
     chooser.update(envelope2)
 
     // Should block envelope2 since we have no message from envelope1's bootstrap stream.
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     // Test batching by giving chooser envelope1 and envelope5, both from same stream, but envelope1 should be preferred partition.
     chooser.update(envelope5)
@@ -107,14 +108,14 @@ class TestDefaultChooser {
     chooser.update(envelope1)
     assertEquals(envelope5, chooser.choose)
     assertEquals(envelope1, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     // Now we're back to just envelope3, envelope2. Let's catch up envelope1's SSP using envelope4's offset.
     chooser.update(envelope4)
     assertEquals(envelope4, chooser.choose)
 
     // Should still block envelopes 1 and 2 because the second partition hasn't caught up yet.
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     // Now catch up the second partition.
     chooser.update(envelope6)
@@ -135,7 +136,7 @@ class TestDefaultChooser {
 
     // Now we should finally get the lowest priority non-bootstrap stream, envelope3.
     assertEquals(envelope3, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
   }
 
   @Test
@@ -166,4 +167,4 @@ class TestDefaultChooser {
 class MockBlockingEnvelopeMap extends BlockingEnvelopeMap {
   def start = Unit
   def stop = Unit
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala
index 01802b9..1329e84 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala
@@ -19,15 +19,16 @@
 
 package org.apache.samza.system.chooser
 
-import org.junit.Assert._
-import org.junit.Test
+import java.util.Arrays
+
 import org.apache.samza.Partition
 import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.SystemStreamPartition
+import org.junit.Assert._
+import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
-import java.util.Arrays
 
 @RunWith(value = classOf[Parameterized])
 class TestRoundRobinChooser(getChooser: () => MessageChooser) {
@@ -43,12 +44,12 @@ class TestRoundRobinChooser(getChooser: () => MessageChooser) {
     chooser.register(envelope3.getSystemStreamPartition, "123")
     chooser.start
 
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     // Test one message.
     chooser.update(envelope1)
     assertEquals(envelope1, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     // Verify simple ordering.
     chooser.update(envelope1)
@@ -58,7 +59,7 @@ class TestRoundRobinChooser(getChooser: () => MessageChooser) {
     assertEquals(envelope1, chooser.choose)
     assertEquals(envelope2, chooser.choose)
     assertEquals(envelope3, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     // Verify mixed ordering.
     chooser.update(envelope2)
@@ -72,7 +73,7 @@ class TestRoundRobinChooser(getChooser: () => MessageChooser) {
 
     assertEquals(envelope1, chooser.choose)
     assertEquals(envelope2, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     // Verify simple ordering with different starting envelope.
     chooser.update(envelope2)
@@ -82,7 +83,7 @@ class TestRoundRobinChooser(getChooser: () => MessageChooser) {
     assertEquals(envelope2, chooser.choose)
     assertEquals(envelope1, chooser.choose)
     assertEquals(envelope3, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
   }
 }
 
@@ -92,4 +93,4 @@ object TestRoundRobinChooser {
   // plain vanilla round robin chooser.
   @Parameters
   def parameters: java.util.Collection[Array[() => MessageChooser]] = Arrays.asList(Array(() => new RoundRobinChooser), Array(() => new DefaultChooser))
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala
index 4cde630..3e435ae 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala
@@ -19,18 +19,20 @@
 
 package org.apache.samza.system.chooser
 
+import java.util.Arrays
+
 import org.junit.Assert._
 import org.junit.Test
-import org.apache.samza.system.IncomingMessageEnvelope
-import scala.collection.immutable.Queue
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.Partition
-import org.apache.samza.SamzaException
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
-import java.util.Arrays
+import org.apache.samza.Partition
+import org.apache.samza.SamzaException
+import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.SystemStream
+import org.apache.samza.system.SystemStreamPartition
+
+import scala.collection.immutable.Queue
 
 @RunWith(value = classOf[Parameterized])
 class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, MessageChooser], MessageChooser) => MessageChooser) {
@@ -68,10 +70,10 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me
 
     chooser.register(envelope1.getSystemStreamPartition, null)
     chooser.start
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope1)
     assertEquals(envelope1, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
   }
 
   @Test
@@ -85,7 +87,7 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me
     // The SSP for envelope2 is not defined as a priority stream.
     chooser.register(envelope2.getSystemStreamPartition, null)
     chooser.start
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     try {
       chooser.update(envelope2)
@@ -106,18 +108,18 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me
     chooser.register(envelope1.getSystemStreamPartition, null)
     chooser.start
 
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope1)
     chooser.update(envelope4)
     assertEquals(envelope1, chooser.choose)
     assertEquals(envelope4, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     chooser.update(envelope4)
     chooser.update(envelope1)
     assertEquals(envelope4, chooser.choose)
     assertEquals(envelope1, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
   }
 
   @Test
@@ -132,18 +134,18 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me
     chooser.register(envelope3.getSystemStreamPartition, null)
     chooser.start
 
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope2)
     chooser.update(envelope3)
     assertEquals(envelope2, chooser.choose)
     assertEquals(envelope3, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     chooser.update(envelope3)
     chooser.update(envelope2)
     assertEquals(envelope3, chooser.choose)
     assertEquals(envelope2, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
   }
 
   @Test
@@ -160,30 +162,30 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me
     chooser.register(envelope2.getSystemStreamPartition, null)
     chooser.start
 
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope1)
     chooser.update(envelope4)
     assertEquals(envelope1, chooser.choose)
     assertEquals(envelope4, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     chooser.update(envelope4)
     chooser.update(envelope1)
     assertEquals(envelope4, chooser.choose)
     assertEquals(envelope1, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     chooser.update(envelope2)
     chooser.update(envelope4)
     assertEquals(envelope2, chooser.choose)
     assertEquals(envelope4, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     chooser.update(envelope1)
     chooser.update(envelope2)
     assertEquals(envelope1, chooser.choose)
     assertEquals(envelope2, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
   }
 
   @Test
@@ -203,18 +205,18 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me
     chooser.register(envelope2.getSystemStreamPartition, null)
     chooser.start
 
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
     chooser.update(envelope1)
     chooser.update(envelope4)
     assertEquals(envelope1, chooser.choose)
     assertEquals(envelope4, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     chooser.update(envelope4)
     chooser.update(envelope1)
     assertEquals(envelope4, chooser.choose)
     assertEquals(envelope1, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     chooser.update(envelope2)
     chooser.update(envelope4)
@@ -222,18 +224,18 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me
     // priority.
     assertEquals(envelope4, chooser.choose)
     assertEquals(envelope2, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     chooser.update(envelope1)
     chooser.update(envelope2)
     assertEquals(envelope1, chooser.choose)
     assertEquals(envelope2, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
 
     // Just the low priority stream.
     chooser.update(envelope2)
     assertEquals(envelope2, chooser.choose)
-    assertEquals(null, chooser.choose)
+    assertNull(chooser.choose)
   }
 }
 
@@ -245,4 +247,4 @@ object TestTieredPriorityChooser {
   def parameters: java.util.Collection[Array[(Map[SystemStream, Int], Map[Int, MessageChooser], MessageChooser) => MessageChooser]] = Arrays.asList(
     Array((priorities: Map[SystemStream, Int], choosers: Map[Int, MessageChooser], default: MessageChooser) => new TieredPriorityChooser(priorities, choosers, default)),
     Array((priorities: Map[SystemStream, Int], choosers: Map[Int, MessageChooser], default: MessageChooser) => new DefaultChooser(default, None, priorities, choosers)))
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala
index fb26bfc..525d126 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala
@@ -19,20 +19,22 @@
 
 package org.apache.samza.system.filereader
 
-import org.junit.Assert._
-import scala.collection.JavaConversions._
 import java.io.PrintWriter
 import java.io.File
-import org.scalatest.junit.AssertionsForJUnit
-import org.junit.Test
-import org.junit.Before
-import org.junit.After
 import java.io.RandomAccessFile
+
+import org.apache.samza.SamzaException
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.Partition
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.junit.Assert._
+import org.junit.Test
+import org.junit.Before
+import org.junit.After
+import org.scalatest.junit.AssertionsForJUnit
+
 import scala.collection.mutable.HashMap
-import org.apache.samza.SamzaException
+import scala.collection.JavaConversions._
 
 class TestFileReaderSystemAdmin extends AssertionsForJUnit {
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
index f505eb1..5707bb4 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
@@ -22,12 +22,14 @@ package org.apache.samza.system.filereader
 import java.io.File
 import java.io.FileWriter
 import java.io.PrintWriter
+
 import org.apache.samza.Partition
 import org.apache.samza.system.SystemStreamPartition
 import org.junit.AfterClass
 import org.junit.Assert._
 import org.junit.BeforeClass
 import org.junit.Test
+
 import scala.collection.JavaConversions._
 import scala.collection.mutable.HashMap
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala
index 330df78..c3295f3 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala
@@ -19,11 +19,12 @@
 
 package org.apache.samza.system.filereader
 
+import org.apache.samza.SamzaException
 import org.junit.Assert._
-import scala.collection.JavaConversions._
-import org.scalatest.junit.AssertionsForJUnit
 import org.junit.Test
-import org.apache.samza.SamzaException
+import org.scalatest.junit.AssertionsForJUnit
+
+import scala.collection.JavaConversions._
 
 class TestFileReaderSystemFactory extends AssertionsForJUnit {
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
index 7cfeb5a..c141b5f 100644
--- a/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
@@ -19,10 +19,10 @@
 
 package org.apache.samza.task
 
-import org.junit.Assert._
-import org.junit.Test
 import org.apache.samza.task.TaskCoordinator.RequestScope
 import org.apache.samza.container.TaskName
+import org.junit.Assert._
+import org.junit.Test
 
 class TestReadableCoordinator {
   val taskName = new TaskName("P0")

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala b/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala
index 6353378..ee56e20 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.samza.util
 
 import org.junit.Assert._
@@ -28,9 +29,9 @@ class TestDaemonThreadFactory {
     val dtf = new DaemonThreadFactory(testThreadName)
     val threadWithName = dtf.newThread(new Runnable {
       def run() {
-        //Not testing this particular method
+        // Not testing this particular method
       }
     })
-    assertEquals(threadWithName.getName, ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX+testThreadName)
+    assertEquals(ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX + testThreadName, threadWithName.getName)
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
index 4a561d1..9ba8a4d 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,14 +15,13 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
 
 package org.apache.samza.util
 
+import org.apache.samza.util.ExponentialSleepStrategy.RetryLoop
 import org.junit.Assert._
 import org.junit.Test
-import org.apache.samza.util.ExponentialSleepStrategy.RetryLoop
 
 class TestExponentialSleepStrategy {
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
index 7c314ce..8c21901 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.samza.util
 
 import org.apache.samza.Partition
@@ -31,6 +32,7 @@ import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.util.Util._
 import org.junit.Assert._
 import org.junit.Test
+
 import scala.collection.JavaConversions._
 import scala.util.Random
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
index 7a23041..b76d5ad 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
@@ -18,10 +18,10 @@
  */
 package org.apache.samza.checkpoint.kafka
 
+import org.apache.samza.SamzaException
 import org.apache.samza.container.TaskName
 import org.junit.Assert._
 import org.junit.{Before, Test}
-import org.apache.samza.SamzaException
 
 class TestKafkaCheckpointLogKey {
   @Before

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 4827731..553d6b4 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -19,6 +19,7 @@
 
 package org.apache.samza.checkpoint.kafka
 
+import kafka.admin.AdminUtils
 import kafka.common.InvalidMessageSizeException
 import kafka.common.UnknownTopicOrPartitionException
 import kafka.message.InvalidMessageException
@@ -31,20 +32,20 @@ import kafka.utils.TestZKUtils
 import kafka.utils.Utils
 import kafka.utils.ZKStringSerializer
 import kafka.zk.EmbeddedZookeeper
+
 import org.I0Itec.zkclient.ZkClient
 import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.config.MapConfig
 import org.apache.samza.container.TaskName
+import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
 import org.apache.samza.serializers.CheckpointSerde
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.util.{ ClientUtilTopicMetadataStore, TopicMetadataStore }
 import org.apache.samza.{ SamzaException, Partition }
 import org.junit.Assert._
 import org.junit.{ AfterClass, BeforeClass, Test }
-import scala.collection.JavaConversions._
+
 import scala.collection._
-import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
-import kafka.admin.AdminUtils
-import org.apache.samza.config.MapConfig
 import scala.collection.JavaConversions._
 
 object TestKafkaCheckpointManager {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 468aa3d..8109f73 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -19,15 +19,18 @@
 
 package org.apache.samza.config
 
-import org.junit.Assert._
-import org.junit.Test
 import java.net.URI
 import java.io.File
 import java.util.Properties
-import scala.collection.JavaConversions._
-import org.apache.samza.config.factories.PropertiesConfigFactory
+
 import kafka.consumer.ConsumerConfig
 
+import org.apache.samza.config.factories.PropertiesConfigFactory
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConversions._
+
 class TestKafkaConfig {
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala
index fabae68..5cf82c2 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala
@@ -18,10 +18,12 @@
  */
 
 package org.apache.samza.config
+
+import org.apache.samza.config.KafkaSerdeConfig.Config2KafkaSerde
 import org.junit.Assert._
 import org.junit.Test
+
 import scala.collection.JavaConversions._
-import org.apache.samza.config.KafkaSerdeConfig.Config2KafkaSerde
 
 class TestKafkaSerdeConfig {
   val MAGIC_VAL = "1000"
@@ -33,7 +35,7 @@ class TestKafkaSerdeConfig {
 
   @Test
   def testKafkaConfigurationIsBackwardsCompatible {
-    assert(config.getKafkaEncoder("test").getOrElse("").equals(MAGIC_VAL))
-    assert(config.getKafkaDecoder("test").getOrElse("").equals(MAGIC_VAL))
+    assertEquals(MAGIC_VAL, config.getKafkaEncoder("test").getOrElse(""))
+    assertEquals(MAGIC_VAL, config.getKafkaDecoder("test").getOrElse(""))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
index 77cdbe3..89ced34 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
@@ -19,11 +19,13 @@
 
 package org.apache.samza.config
 
-import org.junit.Test
 import collection.JavaConversions._
+
+import org.apache.samza.SamzaException
 import org.junit.Assert._
+import org.junit.Test
+
 import KafkaConfig._
-import org.apache.samza.SamzaException
 
 class TestRegExTopicGenerator {