You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/09/16 18:29:34 UTC
[2/2] git commit: SAMZA-412;
replace assert calls in tests with appropriate JUnit assert methods
SAMZA-412; replace assert calls in tests with appropriate JUnit assert methods
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/811f2897
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/811f2897
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/811f2897
Branch: refs/heads/master
Commit: 811f2897c640c684f4506a6bab7590304de34fca
Parents: cb40a59
Author: David Chen <dc...@linkedin.com>
Authored: Tue Sep 16 09:29:20 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Sep 16 09:29:20 2014 -0700
----------------------------------------------------------------------
.../org/apache/samza/config/TestConfig.java | 45 ++--
.../metrics/TestSlidingTimeWindowReservoir.java | 20 +-
.../org/apache/samza/metrics/TestTimer.java | 14 +-
.../samza/util/TestBlockingEnvelopeMap.java | 2 +
.../samza/util/TestNoOpMetricsRegistry.java | 26 ++-
...inglePartitionWithoutOffsetsSystemAdmin.java | 10 +-
.../samza/checkpoint/TestCheckpointTool.scala | 3 +-
.../samza/checkpoint/TestOffsetManager.scala | 11 +-
.../factories/TestPropertiesConfigFactory.scala | 4 +-
.../samza/container/TestSamzaContainer.scala | 14 +-
.../samza/container/TestTaskInstance.scala | 29 +--
.../TestTaskNamesToSystemStreamPartitions.scala | 6 +-
.../grouper/stream/GroupByTestBase.scala | 13 +-
.../grouper/stream/TestGroupByPartition.scala | 10 +-
.../TestGroupBySystemStreamPartition.scala | 15 +-
.../org/apache/samza/job/TestJobRunner.scala | 5 +-
.../samza/job/TestShellCommandBuilder.scala | 2 +-
.../apache/samza/job/local/TestProcessJob.scala | 3 +-
.../apache/samza/job/local/TestThreadJob.scala | 3 +-
.../apache/samza/metrics/TestJmxServer.scala | 17 +-
.../metrics/reporter/TestJmxReporter.scala | 19 +-
.../samza/serializers/TestByteSerde.scala | 11 +-
.../samza/serializers/TestCheckpointSerde.scala | 2 +
.../samza/serializers/TestIntegerSerde.scala | 7 +-
.../samza/serializers/TestStringSerde.scala | 5 +-
.../samza/system/TestSystemConsumers.scala | 19 +-
.../system/chooser/TestBatchingChooser.scala | 22 +-
.../chooser/TestBootstrappingChooser.scala | 70 +++---
.../system/chooser/TestDefaultChooser.scala | 23 +-
.../system/chooser/TestRoundRobinChooser.scala | 19 +-
.../chooser/TestTieredPriorityChooser.scala | 56 ++---
.../filereader/TestFileReaderSystemAdmin.scala | 16 +-
.../TestFileReaderSystemConsumer.scala | 2 +
.../TestFileReaderSystemFactory.scala | 7 +-
.../samza/task/TestReadableCoordinator.scala | 4 +-
.../samza/util/TestDaemonThreadFactory.scala | 7 +-
.../util/TestExponentialSleepStrategy.scala | 4 +-
.../scala/org/apache/samza/util/TestUtil.scala | 2 +
.../kafka/TestKafkaCheckpointLogKey.scala | 2 +-
.../kafka/TestKafkaCheckpointManager.scala | 9 +-
.../apache/samza/config/TestKafkaConfig.scala | 11 +-
.../samza/config/TestKafkaSerdeConfig.scala | 8 +-
.../samza/config/TestRegExTopicGenerator.scala | 6 +-
.../samza/serializers/TestKafkaSerde.scala | 8 +-
.../samza/system/kafka/TestBrokerProxy.scala | 26 +--
.../samza/system/kafka/TestGetOffset.scala | 16 +-
.../system/kafka/TestKafkaSystemAdmin.scala | 37 ++--
.../system/kafka/TestKafkaSystemConsumer.scala | 15 +-
.../system/kafka/TestKafkaSystemFactory.scala | 11 +-
.../system/kafka/TestKafkaSystemProducer.scala | 24 ++-
.../system/kafka/TestTopicMetadataCache.scala | 14 +-
.../samza/storage/kv/TestKeyValueStores.scala | 16 +-
.../samza/logging/log4j/TestJmxAppender.java | 5 +-
.../samza/serializers/TestJsonSerde.scala | 4 +-
.../serializers/TestMetricsSnapshotSerde.scala | 14 +-
.../test/integration/TestStatefulTask.scala | 11 +-
.../TestSamzaContainerPerformance.scala | 17 +-
.../samza/job/yarn/TestSamzaAppMaster.scala | 25 +--
.../job/yarn/TestSamzaAppMasterLifecycle.scala | 16 +-
.../job/yarn/TestSamzaAppMasterService.scala | 18 +-
.../yarn/TestSamzaAppMasterTaskManager.scala | 214 +++++++++----------
61 files changed, 587 insertions(+), 487 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-api/src/test/java/org/apache/samza/config/TestConfig.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/config/TestConfig.java b/samza-api/src/test/java/org/apache/samza/config/TestConfig.java
index e701296..b4100c2 100644
--- a/samza-api/src/test/java/org/apache/samza/config/TestConfig.java
+++ b/samza-api/src/test/java/org/apache/samza/config/TestConfig.java
@@ -19,13 +19,18 @@
package org.apache.samza.config;
-import org.junit.Assert.* ;
-import org.junit.Test ;
-import java.util.Map ;
-import java.util.HashMap ;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map;
+import java.util.HashMap;
+
+import org.junit.Test;
public class TestConfig {
- // Utility methods to make it easier to tell the class of a primitive via overloaded args
+ /**
+ * Utility methods to make it easier to tell the class of a primitive via
+ * overloaded args
+ */
Class getClass(long l) {
return Long.class ;
}
@@ -35,25 +40,25 @@ public class TestConfig {
}
@Test
- public void testgetShortAndLong(){
- Map<String, String> m = new HashMap<String, String>() { {
- put("testkey", "11") ;
- } } ;
+ public void testgetShortAndLong() {
+ Map<String, String> m = new HashMap<String, String>() {{
+ put("testkey", "11");
+ }};
- MapConfig mc = new MapConfig(m) ;
- short defaultShort=0 ;
- long defaultLong=0 ;
+ MapConfig mc = new MapConfig(m);
+ short defaultShort = 0;
+ long defaultLong = 0;
- Class c1 = getClass(mc.getShort("testkey")) ;
- assert(c1 == Short.class) ;
+ Class c1 = getClass(mc.getShort("testkey"));
+ assertEquals(Short.class, c1);
- Class c2 = getClass(mc.getShort("testkey", defaultShort)) ;
- assert(c2 == Short.class) ;
+ Class c2 = getClass(mc.getShort("testkey", defaultShort));
+ assertEquals(Short.class, c2);
- Class c3 = getClass(mc.getLong("testkey")) ;
- assert(c3 == Long.class) ;
+ Class c3 = getClass(mc.getLong("testkey"));
+ assertEquals(Long.class, c3);
- Class c4 = getClass(mc.getLong("testkey", defaultLong)) ;
- assert(c4 == Long.class) ;
+ Class c4 = getClass(mc.getLong("testkey", defaultLong));
+ assertEquals(Long.class, c4);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java b/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
index eb5043b..d392b32 100644
--- a/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
+++ b/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java
@@ -19,6 +19,7 @@
package org.apache.samza.metrics;
+import static org.mockito.Mockito.*;
import static org.junit.Assert.*;
import java.util.Arrays;
@@ -26,15 +27,14 @@ import java.util.Arrays;
import org.apache.samza.util.Clock;
import org.junit.Test;
-import static org.mockito.Mockito.*;
-
public class TestSlidingTimeWindowReservoir {
private final Clock clock = mock(Clock.class);
@Test
public void testUpdateSizeSnapshot() {
- SlidingTimeWindowReservoir slidingTimeWindowReservoir = new SlidingTimeWindowReservoir(300, clock);
+ SlidingTimeWindowReservoir slidingTimeWindowReservoir =
+ new SlidingTimeWindowReservoir(300, clock);
when(clock.currentTimeMillis()).thenReturn(0L);
slidingTimeWindowReservoir.update(1L);
@@ -49,24 +49,26 @@ public class TestSlidingTimeWindowReservoir {
Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot();
assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L, 3L)));
- assertTrue(snapshot.getSize() == 3);
+ assertEquals(3, snapshot.getSize());
}
@Test
public void testDuplicateTime() {
- SlidingTimeWindowReservoir slidingTimeWindowReservoir = new SlidingTimeWindowReservoir(300, clock);
+ SlidingTimeWindowReservoir slidingTimeWindowReservoir =
+ new SlidingTimeWindowReservoir(300, clock);
when(clock.currentTimeMillis()).thenReturn(0L);
slidingTimeWindowReservoir.update(1L);
slidingTimeWindowReservoir.update(2L);
Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot();
assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L)));
- assertTrue(snapshot.getSize() == 2);
+ assertEquals(2, snapshot.getSize());
}
@Test
public void testRemoveExpiredValues() {
- SlidingTimeWindowReservoir slidingTimeWindowReservoir = new SlidingTimeWindowReservoir(300, clock);
+ SlidingTimeWindowReservoir slidingTimeWindowReservoir =
+ new SlidingTimeWindowReservoir(300, clock);
when(clock.currentTimeMillis()).thenReturn(0L);
slidingTimeWindowReservoir.update(1L);
@@ -81,6 +83,6 @@ public class TestSlidingTimeWindowReservoir {
Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot();
assertTrue(snapshot.getValues().containsAll(Arrays.asList(3L, 4L)));
- assertTrue(snapshot.getSize() == 2);
+ assertEquals(2, snapshot.getSize());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
index dcc3cb8..63c183f 100644
--- a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
+++ b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java
@@ -28,7 +28,9 @@ import org.junit.Test;
public class TestTimer {
- // mock clock
+ /*
+ * Mock clock
+ */
private final Clock clock = new Clock() {
long value = 0;
@@ -46,7 +48,7 @@ public class TestTimer {
Snapshot snapshot = timer.getSnapshot();
assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L)));
- assertTrue(snapshot.getValues().size() == 2);
+ assertEquals(2, snapshot.getValues().size());
}
@Test
@@ -58,13 +60,13 @@ public class TestTimer {
Snapshot snapshot = timer.getSnapshot();
assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L, 3L)));
- assertTrue(snapshot.getValues().size() == 3);
+ assertEquals(3, snapshot.getValues().size());
- // the time is 500 for update(4L) because getSnapshot calls clock once + 3
+ // The time is 500 for update(4L) because getSnapshot calls clock once + 3
// updates that call clock 3 times
timer.update(4L);
Snapshot snapshot2 = timer.getSnapshot();
assertTrue(snapshot2.getValues().containsAll(Arrays.asList(3L, 4L)));
- assertTrue(snapshot2.getValues().size() == 2);
+ assertEquals(2, snapshot2.getValues().size());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
index 35ba52d..4eb87eb 100644
--- a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
+++ b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
@@ -22,6 +22,7 @@ package org.apache.samza.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -30,6 +31,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+
import org.apache.samza.Partition;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java b/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java
index 2d0034f..1d1e3c5 100644
--- a/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java
+++ b/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java
@@ -33,32 +33,38 @@ public class TestNoOpMetricsRegistry {
Counter counter1 = registry.newCounter("testc", "a");
Counter counter2 = registry.newCounter("testc", "b");
Counter counter3 = registry.newCounter("testc2", "c");
+
Gauge<String> gauge1 = registry.newGauge("testg", "a", "1");
Gauge<String> gauge2 = registry.newGauge("testg", "b", "2");
Gauge<String> gauge3 = registry.newGauge("testg", "c", "3");
Gauge<String> gauge4 = registry.newGauge("testg2", "d", "4");
+
Timer timer1 = registry.newTimer("testt", "a");
Timer timer2 = registry.newTimer("testt", "b");
Timer timer3 = registry.newTimer("testt2", "c");
+
counter1.inc();
counter2.inc(2);
counter3.inc(4);
+
gauge1.set("5");
gauge2.set("6");
gauge3.set("7");
gauge4.set("8");
+
timer1.update(1L);
timer2.update(2L);
timer3.update(3L);
- assertEquals(counter1.getCount(), 1);
- assertEquals(counter2.getCount(), 2);
- assertEquals(counter3.getCount(), 4);
- assertEquals(gauge1.getValue(), "5");
- assertEquals(gauge2.getValue(), "6");
- assertEquals(gauge3.getValue(), "7");
- assertEquals(gauge4.getValue(), "8");
- assertEquals(timer1.getSnapshot().getAverage(), 1, 0);
- assertEquals(timer2.getSnapshot().getAverage(), 2, 0);
- assertEquals(timer3.getSnapshot().getAverage(), 3, 0);
+
+ assertEquals(1, counter1.getCount());
+ assertEquals(2, counter2.getCount());
+ assertEquals(4, counter3.getCount());
+ assertEquals("5", gauge1.getValue());
+ assertEquals("6", gauge2.getValue());
+ assertEquals("7", gauge3.getValue());
+ assertEquals("8", gauge4.getValue());
+ assertEquals(1, timer1.getSnapshot().getAverage(), 0);
+ assertEquals(2, timer2.getSnapshot().getAverage(), 0);
+ assertEquals(3, timer3.getSnapshot().getAverage(), 0);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-api/src/test/java/org/apache/samza/util/TestSinglePartitionWithoutOffsetsSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/util/TestSinglePartitionWithoutOffsetsSystemAdmin.java b/samza-api/src/test/java/org/apache/samza/util/TestSinglePartitionWithoutOffsetsSystemAdmin.java
index 4166493..025f0a6 100644
--- a/samza-api/src/test/java/org/apache/samza/util/TestSinglePartitionWithoutOffsetsSystemAdmin.java
+++ b/samza-api/src/test/java/org/apache/samza/util/TestSinglePartitionWithoutOffsetsSystemAdmin.java
@@ -20,10 +20,12 @@
package org.apache.samza.util;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+
import org.apache.samza.Partition;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.SystemStreamMetadata;
@@ -36,12 +38,14 @@ public class TestSinglePartitionWithoutOffsetsSystemAdmin {
Set<String> streamNames = new HashSet<String>();
streamNames.add("a");
streamNames.add("b");
+
Map<String, SystemStreamMetadata> metadata = admin.getSystemStreamMetadata(streamNames);
- assertEquals(metadata.size(), 2);
+ assertEquals(2, metadata.size());
SystemStreamMetadata metadata1 = metadata.get("a");
SystemStreamMetadata metadata2 = metadata.get("b");
+
assertEquals(1, metadata1.getSystemStreamPartitionMetadata().size());
assertEquals(1, metadata2.getSystemStreamPartitionMetadata().size());
- assertEquals(null, metadata.get(new SystemStreamPartition("test-system", "c", new Partition(0))));
+ assertNull(metadata.get(new SystemStreamPartition("test-system", "c", new Partition(0))));
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index 1eb3995..af800df 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -20,6 +20,7 @@
package org.apache.samza.checkpoint
import org.apache.samza.Partition
+import org.apache.samza.container.TaskName
import org.apache.samza.checkpoint.TestCheckpointTool.{MockCheckpointManagerFactory, MockSystemFactory}
import org.apache.samza.config.{Config, MapConfig, SystemConfig, TaskConfig}
import org.apache.samza.metrics.MetricsRegistry
@@ -30,8 +31,8 @@ import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.junit.AssertionsForJUnit
import org.scalatest.mock.MockitoSugar
+
import scala.collection.JavaConversions._
-import org.apache.samza.container.TaskName
object TestCheckpointTool {
var checkpointManager: CheckpointManager = null
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index 44a98a5..a79ecca 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -19,7 +19,9 @@
package org.apache.samza.checkpoint
-import scala.collection.JavaConversions._
+import java.util
+
+import org.apache.samza.container.TaskName
import org.apache.samza.Partition
import org.apache.samza.system.SystemStream
import org.apache.samza.system.SystemStreamMetadata
@@ -30,10 +32,10 @@ import org.junit.{Ignore, Test}
import org.apache.samza.SamzaException
import org.apache.samza.config.MapConfig
import org.apache.samza.system.SystemAdmin
-import java.util
-import org.apache.samza.container.TaskName
import org.scalatest.Assertions.intercept
+import scala.collection.JavaConversions._
+
class TestOffsetManager {
@Test
def testSystemShouldUseDefaults {
@@ -47,7 +49,7 @@ class TestOffsetManager {
val offsetManager = OffsetManager(systemStreamMetadata, config)
offsetManager.register(taskName, Set(systemStreamPartition))
offsetManager.start
- assertTrue(!offsetManager.getLastProcessedOffset(systemStreamPartition).isDefined)
+ assertFalse(offsetManager.getLastProcessedOffset(systemStreamPartition).isDefined)
assertTrue(offsetManager.getStartingOffset(systemStreamPartition).isDefined)
assertEquals("0", offsetManager.getStartingOffset(systemStreamPartition).get)
}
@@ -232,7 +234,6 @@ class TestOffsetManager {
override def writeChangeLogPartitionMapping(mapping: util.Map[TaskName, java.lang.Integer]): Unit = taskNameToPartitionMapping = mapping
override def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = taskNameToPartitionMapping
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala b/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala
index f254741..9688abb 100644
--- a/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala
+++ b/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala
@@ -18,8 +18,10 @@
*/
package org.apache.samza.config.factories
+
import java.net.URI
import java.io.File
+
import org.apache.samza.SamzaException
import org.junit.Assert._
import org.junit.Test
@@ -30,7 +32,7 @@ class TestPropertiesConfigFactory {
@Test
def testCanReadPropertiesConfigFiles {
val config = factory.getConfig(URI.create("file://%s/src/test/resources/test.properties" format new File(".").getCanonicalPath))
- assert("bar".equals(config.get("foo")))
+ assertEquals("bar", config.get("foo"))
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 8a04a8a..b7a9569 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -24,26 +24,26 @@ import org.junit.Assert._
import org.junit.Test
import org.apache.samza.Partition
import org.apache.samza.config.MapConfig
+import org.apache.samza.metrics.JmxServer
+import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.system.SystemConsumers
-import org.apache.samza.system.chooser.RoundRobinChooser
import org.apache.samza.system.SystemConsumer
import org.apache.samza.system.SystemProducers
import org.apache.samza.system.SystemProducer
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.StreamMetadataCache
+import org.apache.samza.system.chooser.RoundRobinChooser
import org.apache.samza.serializers.SerdeManager
import org.apache.samza.task.StreamTask
import org.apache.samza.task.MessageCollector
-import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.task.TaskCoordinator
import org.apache.samza.task.InitableTask
import org.apache.samza.task.TaskContext
import org.apache.samza.task.ClosableTask
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.StreamMetadataCache
import org.apache.samza.task.TaskInstanceCollector
+import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
import org.scalatest.junit.AssertionsForJUnit
-import org.apache.samza.metrics.JmxServer
class TestSamzaContainer extends AssertionsForJUnit {
@Test
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index be53373..c31a74e 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -21,27 +21,28 @@ package org.apache.samza.container
import org.junit.Assert._
import org.junit.Test
-import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.system.SystemProducers
-import org.apache.samza.task.MessageCollector
-import org.apache.samza.task.StreamTask
-import org.apache.samza.system.SystemConsumers
-import org.apache.samza.task.TaskCoordinator
-import org.apache.samza.config.MapConfig
import org.apache.samza.Partition
-import org.apache.samza.system.chooser.RoundRobinChooser
-import org.apache.samza.system.SystemProducer
+import org.apache.samza.checkpoint.OffsetManager
+import org.apache.samza.config.MapConfig
import org.apache.samza.serializers.SerdeManager
+import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.system.SystemConsumer
+import org.apache.samza.system.SystemConsumers
+import org.apache.samza.system.SystemProducer
+import org.apache.samza.system.SystemProducers
import org.apache.samza.system.SystemStream
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.task.ReadableCoordinator
-import org.apache.samza.checkpoint.OffsetManager
import org.apache.samza.system.SystemStreamMetadata
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import scala.collection.JavaConversions._
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.chooser.RoundRobinChooser
+import org.apache.samza.task.MessageCollector
+import org.apache.samza.task.ReadableCoordinator
+import org.apache.samza.task.StreamTask
+import org.apache.samza.task.TaskCoordinator
import org.apache.samza.task.TaskInstanceCollector
+import scala.collection.JavaConversions._
+
class TestTaskInstance {
@Test
def testOffsetsAreUpdatedOnProcess {
@@ -80,4 +81,4 @@ class TestTaskInstance {
assertTrue(lastProcessedOffset.isDefined)
assertEquals("2", lastProcessedOffset.get)
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala
index d680b20..9a3406e 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala
@@ -18,10 +18,10 @@
*/
package org.apache.samza.container
-import org.junit.Test
-import org.junit.Assert._
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.{SamzaException, Partition}
+import org.junit.Test
+import org.junit.Assert._
class TestTaskNamesToSystemStreamPartitions {
var sspCounter = 0
@@ -36,7 +36,7 @@ class TestTaskNamesToSystemStreamPartitions {
val asSet = tntssp.toSet
val expected = Set(new TaskName("tn1") -> Set(makeSSP("tn1-1"), makeSSP("tn1-2")),
(new TaskName("tn2") -> Set(makeSSP("tn2-1"), makeSSP("tn2-2"))))
- assertEquals(expected , asSet)
+ assertEquals(expected, asSet)
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
index 47d716e..a14169b 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala
@@ -18,15 +18,16 @@
*/
package org.apache.samza.container.grouper.stream
-import org.apache.samza.Partition
-import org.apache.samza.system.SystemStreamPartition
-import org.junit.Test
+import java.util.Collections
import java.util.HashSet
import java.util.Map
import java.util.Set
-import org.junit.Assert._
-import java.util.Collections
+
+import org.apache.samza.Partition
import org.apache.samza.container.TaskName
+import org.apache.samza.system.SystemStreamPartition
+import org.junit.Test
+import org.junit.Assert._
object GroupByTestBase {
val aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0))
@@ -54,4 +55,4 @@ abstract class GroupByTestBase {
val result: Map[TaskName, Set[SystemStreamPartition]] = grouper.group(input)
assertEquals(output, result)
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
index 2fa718c..74daf72 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala
@@ -19,15 +19,17 @@
package org.apache.samza.container.grouper.stream
import org.apache.samza.container.TaskName
-import scala.collection.JavaConverters._
import org.junit.Test
+import scala.collection.JavaConverters._
+
class TestGroupByPartition extends GroupByTestBase {
import GroupByTestBase._
- val expected /* from base class provided set */ = Map(new TaskName("Partition 0") -> Set(aa0, ac0).asJava,
- new TaskName("Partition 1") -> Set(aa1, ab1).asJava,
- new TaskName("Partition 2") -> Set(aa2, ab2).asJava).asJava
+ // from base class provided set
+ val expected = Map(new TaskName("Partition 0") -> Set(aa0, ac0).asJava,
+ new TaskName("Partition 1") -> Set(aa1, ab1).asJava,
+ new TaskName("Partition 2") -> Set(aa2, ab2).asJava).asJava
override def getGrouper: SystemStreamPartitionGrouper = new GroupByPartition
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
index 8da0595..deb3895 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala
@@ -19,19 +19,20 @@
package org.apache.samza.container.grouper.stream
import org.apache.samza.container.TaskName
-import scala.collection.JavaConverters._
import org.junit.Test
+import scala.collection.JavaConverters._
class TestGroupBySystemStreamPartition extends GroupByTestBase {
import GroupByTestBase._
// Building manually to avoid just duplicating a logic potential logic error here and there
- val expected /* from base class provided set */ = Map(new TaskName(aa0.toString) -> Set(aa0).asJava,
- new TaskName(aa1.toString) -> Set(aa1).asJava,
- new TaskName(aa2.toString) -> Set(aa2).asJava,
- new TaskName(ab1.toString) -> Set(ab1).asJava,
- new TaskName(ab2.toString) -> Set(ab2).asJava,
- new TaskName(ac0.toString) -> Set(ac0).asJava).asJava
+ // From base class provided set
+ val expected = Map(new TaskName(aa0.toString) -> Set(aa0).asJava,
+ new TaskName(aa1.toString) -> Set(aa1).asJava,
+ new TaskName(aa2.toString) -> Set(aa2).asJava,
+ new TaskName(ab1.toString) -> Set(ab1).asJava,
+ new TaskName(ab2.toString) -> Set(ab2).asJava,
+ new TaskName(ac0.toString) -> Set(ac0).asJava).asJava
override def getGrouper: SystemStreamPartitionGrouper = new GroupBySystemStreamPartition
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
index 258ccc1..52057ed 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
@@ -18,9 +18,12 @@
*/
package org.apache.samza.job
+
import java.io.File
+
import org.apache.samza.config.Config
import org.junit.Test
+import org.junit.Assert._
object TestJobRunner {
var processCount = 0
@@ -34,7 +37,7 @@ class TestJobRunner {
"org.apache.samza.config.factories.PropertiesConfigFactory",
"--config-path",
"file://%s/src/test/resources/test.properties" format new File(".").getCanonicalPath))
- assert(TestJobRunner.processCount == 1)
+ assertEquals(1, TestJobRunner.processCount)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala b/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
index f8a535a..b186ec1 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
@@ -18,12 +18,12 @@
*/
package org.apache.samza.job
-import org.junit.Test
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.Partition
import org.apache.samza.util.Util._
import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions}
import org.junit.Assert._
+import org.junit.Test
class TestShellCommandBuilder {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
index d56024d..7f3ccfe 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
@@ -18,6 +18,7 @@
*/
package org.apache.samza.job.local;
+
import org.junit.Assert._
import org.junit.Test
import org.apache.samza.job.ApplicationStatus
@@ -39,6 +40,6 @@ class TestProcessJob {
job.waitForFinish(500)
job.kill
job.waitForFinish(999999)
- assert(ApplicationStatus.UnsuccessfulFinish.equals(job.waitForFinish(999999999)))
+ assertEquals(ApplicationStatus.UnsuccessfulFinish, job.waitForFinish(999999999))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
index 7d45889..4f3f511 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala
@@ -18,6 +18,7 @@
*/
package org.apache.samza.job.local
+
import org.junit.Assert._
import org.junit.Test
import org.apache.samza.job.ApplicationStatus
@@ -44,6 +45,6 @@ class TestThreadJob {
job.waitForFinish(500)
job.kill
job.waitForFinish(999999)
- assert(ApplicationStatus.UnsuccessfulFinish.equals(job.waitForFinish(999999999)))
+ assertEquals(ApplicationStatus.UnsuccessfulFinish, job.waitForFinish(999999999))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala b/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
index f01117d..f49cfaa 100644
--- a/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala
@@ -22,14 +22,15 @@ package org.apache.samza.metrics
import org.junit.Assert._
import org.junit.Test
import org.apache.samza.util.Logging
-import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL}
+
import java.io.IOException
+import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL}
class TestJmxServer extends Logging {
@Test
def serverStartsUp {
- var jmxServer:JmxServer = null
+ var jmxServer: JmxServer = null
try {
jmxServer = new JmxServer
@@ -45,14 +46,16 @@ class TestJmxServer extends Logging {
assertTrue("Connected but mbean count is somehow 0", connection.getMBeanCount.intValue() > 0)
} catch {
case ioe:IOException => fail("Couldn't open connection to local JMX server")
- }finally {
- if(jmxConnector != null) jmxConnector.close
+ } finally {
+ if (jmxConnector != null) {
+ jmxConnector.close
+ }
}
} finally {
- if (jmxServer != null) jmxServer.stop
+ if (jmxServer != null) {
+ jmxServer.stop
+ }
}
-
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala b/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
index f6c8646..3cfd439 100644
--- a/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
+++ b/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
@@ -23,20 +23,23 @@ import org.junit.Assert._
import org.junit.AfterClass
import org.junit.BeforeClass
import org.junit.Test
-import scala.collection.JavaConversions._
import org.apache.samza.task.TaskContext
-import javax.management.remote.JMXConnectorFactory
import org.apache.samza.metrics.MetricsRegistryMap
-import javax.management.remote.JMXConnectorServerFactory
-import javax.management.remote.JMXConnectorServer
-import java.rmi.registry.LocateRegistry
-import javax.management.remote.JMXServiceURL
import org.apache.samza.config.MapConfig
-import java.lang.management.ManagementFactory
import org.apache.samza.Partition
-import javax.management.ObjectName
import org.apache.samza.metrics.JvmMetrics
+import java.lang.management.ManagementFactory
+import java.rmi.registry.LocateRegistry
+
+import javax.management.ObjectName
+import javax.management.remote.JMXServiceURL
+import javax.management.remote.JMXConnectorServerFactory
+import javax.management.remote.JMXConnectorServer
+import javax.management.remote.JMXConnectorFactory
+
+import scala.collection.JavaConversions._
+
object TestJmxReporter {
val port = 4500
val url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:%d/jmxapitestrmi" format port)
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala
index f64c263..f605762 100644
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala
@@ -19,19 +19,20 @@
package org.apache.samza.serializers
+import java.util.Arrays
+
import org.junit.Assert._
import org.junit.Test
-import java.util.Arrays
class TestByteSerde {
@Test
def testByteSerde {
val serde = new ByteSerde
- assertEquals(null, serde.toBytes(null))
- assertEquals(null, serde.fromBytes(null))
+ assertNull(serde.toBytes(null))
+ assertNull(serde.fromBytes(null))
val testBytes = "A lazy way of creating a byte array".getBytes()
- assertTrue(Arrays.equals(serde.toBytes(testBytes), testBytes))
- assertTrue( Arrays.equals(serde.fromBytes(testBytes), testBytes))
+ assertArrayEquals(serde.toBytes(testBytes), testBytes)
+ assertArrayEquals(serde.fromBytes(testBytes), testBytes)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
index 0d07314..3d0a603 100644
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
@@ -20,12 +20,14 @@
package org.apache.samza.serializers
import java.util
+
import org.apache.samza.Partition
import org.apache.samza.checkpoint.Checkpoint
import org.apache.samza.container.TaskName
import org.apache.samza.system.SystemStreamPartition
import org.junit.Assert._
import org.junit.Test
+
import scala.collection.JavaConversions._
class TestCheckpointSerde {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala
index 45a2b04..ad646d7 100644
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala
@@ -19,9 +19,10 @@
package org.apache.samza.serializers
+import java.util.Arrays
+
import org.junit.Assert._
import org.junit.Test
-import java.util.Arrays
class TestIntegerSerde {
@Test
@@ -33,7 +34,7 @@ class TestIntegerSerde {
val fooBar = 37
val fooBarBytes = serde.toBytes(fooBar)
fooBarBytes.foreach(System.err.println)
- assertTrue(Arrays.equals(Array[Byte](0, 0, 0, 37), fooBarBytes))
+ assertArrayEquals(Array[Byte](0, 0, 0, 37), fooBarBytes)
assertEquals(fooBar, serde.fromBytes(fooBarBytes))
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala
index 7fbf0c2..a1e8e88 100644
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala
@@ -21,7 +21,6 @@ package org.apache.samza.serializers
import org.junit.Assert._
import org.junit.Test
-import java.util.Arrays
class TestStringSerde {
@Test
@@ -32,7 +31,7 @@ class TestStringSerde {
val fooBar = "foo bar"
val fooBarBytes = serde.toBytes(fooBar)
- assertTrue(Arrays.equals(fooBar.getBytes("UTF-8"), fooBarBytes))
+ assertArrayEquals(fooBar.getBytes("UTF-8"), fooBarBytes)
assertEquals(fooBar, serde.fromBytes(fooBarBytes))
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
index 04229a6..3fdc781 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -19,15 +19,16 @@
package org.apache.samza.system
-import scala.collection.JavaConversions._
-import org.apache.samza.Partition
import org.junit.Assert._
import org.junit.Test
+import org.apache.samza.Partition
+import org.apache.samza.serializers._
import org.apache.samza.system.chooser.MessageChooser
import org.apache.samza.system.chooser.DefaultChooser
-import org.apache.samza.util.BlockingEnvelopeMap
-import org.apache.samza.serializers._
import org.apache.samza.system.chooser.MockMessageChooser
+import org.apache.samza.util.BlockingEnvelopeMap
+
+import scala.collection.JavaConversions._
class TestSystemConsumers {
def testPollIntervalMs {
@@ -44,7 +45,7 @@ class TestSystemConsumers {
consumers.register(systemStreamPartition1, "1234")
consumers.start
- // Tell the consumer to respond with 1000 messages for SSP0, and no
+ // Tell the consumer to respond with 1000 messages for SSP0, and no
// messages for SSP1.
consumer.setResponseSizes(numEnvelopes)
@@ -60,13 +61,13 @@ class TestSystemConsumers {
// We aren't polling because we're getting non-null envelopes.
assertEquals(2, consumer.polls)
- // Advance the clock to trigger a new poll even though there are still
+ // Advance the clock to trigger a new poll even though there are still
// messages.
now = SystemConsumers.DEFAULT_POLL_INTERVAL_MS
assertEquals(envelope, consumers.choose)
- // We polled even though there are still 997 messages in the unprocessed
+ // We polled even though there are still 997 messages in the unprocessed
// message buffer.
assertEquals(3, consumer.polls)
assertEquals(1, consumer.lastPoll.size)
@@ -74,7 +75,7 @@ class TestSystemConsumers {
// Only SSP1 was polled because we still have messages for SSP2.
assertTrue(consumer.lastPoll.contains(systemStreamPartition1))
- // Now drain all messages for SSP0. There should be exactly 997 messages,
+ // Now drain all messages for SSP0. There should be exactly 997 messages,
// since we have chosen 3 already, and we started with 1000.
(0 until (numEnvelopes - 3)).foreach { i =>
assertEquals(envelope, consumers.choose)
@@ -296,4 +297,4 @@ class TestSystemConsumers {
def stop {}
def register { super.register(systemStreamPartition, "0") }
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala
index d7632b4..6d53697 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala
@@ -19,16 +19,18 @@
package org.apache.samza.system.chooser
-import org.junit.Assert._
-import org.junit.Test
+import java.util.Arrays
+
+import org.apache.samza.Partition
import org.apache.samza.system.IncomingMessageEnvelope
-import scala.collection.immutable.Queue
import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.Partition
+import org.junit.Assert._
+import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters
-import java.util.Arrays
+
+import scala.collection.immutable.Queue
@RunWith(value = classOf[Parameterized])
class TestBatchingChooser(getChooser: (MessageChooser, Int) => MessageChooser) {
@@ -45,9 +47,9 @@ class TestBatchingChooser(getChooser: (MessageChooser, Int) => MessageChooser) {
chooser.start
// Make sure start and register are working.
assertEquals(1, mock.starts)
- assertEquals(null, mock.registers(envelope1.getSystemStreamPartition))
+ assertNull(mock.registers(envelope1.getSystemStreamPartition))
assertEquals("", mock.registers(envelope2.getSystemStreamPartition))
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
chooser.update(envelope1)
assertEquals(envelope1, mock.getEnvelopes.head)
assertEquals(envelope1, chooser.choose)
@@ -84,11 +86,11 @@ class TestBatchingChooser(getChooser: (MessageChooser, Int) => MessageChooser) {
}
object TestBatchingChooser {
- // Test both BatchingChooser and DefaultChooser here. DefaultChooser with
- // just batch size defined should behave just like plain vanilla batching
+ // Test both BatchingChooser and DefaultChooser here. DefaultChooser with
+ // just batch size defined should behave just like plain vanilla batching
// chooser.
@Parameters
def parameters: java.util.Collection[Array[(MessageChooser, Int) => MessageChooser]] = Arrays.asList(
Array((wrapped: MessageChooser, batchSize: Int) => new BatchingChooser(wrapped, batchSize)),
Array((wrapped: MessageChooser, batchSize: Int) => new DefaultChooser(wrapped, Some(batchSize))))
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
index 993daa6..3c2693c 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
@@ -19,20 +19,22 @@
package org.apache.samza.system.chooser
-import org.junit.Assert._
-import org.junit.Test
+import java.util.Arrays
+
import org.apache.samza.system.IncomingMessageEnvelope
-import scala.collection.immutable.Queue
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.Partition
+import org.apache.samza.system.SystemStream
import org.apache.samza.system.SystemStreamMetadata
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.junit.Assert._
+import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters
-import java.util.Arrays
+
import scala.collection.JavaConversions._
-import org.apache.samza.system.SystemStream
+import scala.collection.immutable.Queue
@RunWith(value = classOf[Parameterized])
class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser) {
@@ -61,7 +63,7 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
assertEquals("foo", mock.registers(envelope1.getSystemStreamPartition))
chooser.update(envelope1)
assertEquals(envelope1, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
chooser.stop
assertEquals(1, mock.stops)
}
@@ -72,16 +74,16 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
val metadata = getMetadata(envelope1, "100", Some("123"))
val chooser = getChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata))
- // Even though envelope1's SSP is registered as a bootstrap stream, since
- // 123=123, it should be marked as "caught up" and treated like a normal
- // stream. This means that non-bootstrap stream envelope should be allowed
+ // Even though envelope1's SSP is registered as a bootstrap stream, since
+ // 123=123, it should be marked as "caught up" and treated like a normal
+ // stream. This means that non-bootstrap stream envelope should be allowed
// to be chosen.
chooser.register(envelope1.getSystemStreamPartition, "123")
chooser.register(envelope2.getSystemStreamPartition, "321")
chooser.start
chooser.update(envelope2)
assertEquals(envelope2, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
}
@Test
@@ -90,40 +92,40 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
val metadata = getMetadata(envelope1, "123")
val chooser = getChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata))
- // Even though envelope1's SSP is registered as a bootstrap stream, since
- // 123=123, it should be marked as "caught up" and treated like a normal
- // stream. This means that non-bootstrap stream envelope should be allowed
+ // Even though envelope1's SSP is registered as a bootstrap stream, since
+ // 123=123, it should be marked as "caught up" and treated like a normal
+ // stream. This means that non-bootstrap stream envelope should be allowed
// to be chosen.
chooser.register(envelope1.getSystemStreamPartition, "1")
chooser.register(envelope2.getSystemStreamPartition, null)
chooser.start
chooser.update(envelope2)
- // Choose should not return anything since bootstrapper is blocking
+ // Choose should not return anything since bootstrapper is blocking
// wrapped.choose until it gets an update from envelope1's SSP.
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
chooser.update(envelope1)
- // Now that we have an update from the required SSP, the mock chooser
+ // Now that we have an update from the required SSP, the mock chooser
// should be called, and return.
assertEquals(envelope2, chooser.choose)
- // The chooser still has an envelope from envelope1's SSP, so it should
+ // The chooser still has an envelope from envelope1's SSP, so it should
// return.
assertEquals(envelope1, chooser.choose)
// No envelope for envelope1's SSP has been given, so it should block.
chooser.update(envelope2)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
// Now we're giving an envelope with the proper last offset (123), so no
// envelope1's SSP should be treated no differently than envelope2's.
chooser.update(envelope4)
assertEquals(envelope2, chooser.choose)
assertEquals(envelope4, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
// Should not block here since there are no more lagging bootstrap streams.
chooser.update(envelope2)
assertEquals(envelope2, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
chooser.update(envelope2)
assertEquals(envelope2, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
}
@Test
@@ -138,54 +140,54 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
chooser.register(envelope3.getSystemStreamPartition, "1")
chooser.start
chooser.update(envelope1)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
chooser.update(envelope3)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
chooser.update(envelope2)
// Fully loaded now.
assertEquals(envelope1, chooser.choose)
// Can't pick again because envelope1's SSP is missing.
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
chooser.update(envelope1)
// Can pick again.
assertEquals(envelope3, chooser.choose)
// Can still pick since envelope3.SSP isn't being tracked.
assertEquals(envelope2, chooser.choose)
// Can't pick since envelope2.SSP needs an envelope now.
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
chooser.update(envelope2)
// Now we get envelope1 again.
assertEquals(envelope1, chooser.choose)
// Can't pick again.
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
// Now use envelope4, to trigger "all caught up" for envelope1.SSP.
chooser.update(envelope4)
// Chooser's contents is currently: e2, e4 (System.err.println(mock.getEnvelopes))
// Add envelope3, whose SSP isn't being tracked.
chooser.update(envelope3)
assertEquals(envelope2, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
chooser.update(envelope2)
// Chooser's contents is currently: e4, e3, e2 (System.err.println(mock.getEnvelopes))
assertEquals(envelope4, chooser.choose)
- // This should be allowed, even though no message from envelope1.SSP is
- // available, since envelope4 triggered "all caught up" because its offset
- // matches the offset map for this SSP, and we still have an envelope for
+ // This should be allowed, even though no message from envelope1.SSP is
+ // available, since envelope4 triggered "all caught up" because its offset
+ // matches the offset map for this SSP, and we still have an envelope for
// envelope2.SSP in the queue.
assertEquals(envelope3, chooser.choose)
assertEquals(envelope2, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
// Fin.
}
}
object TestBootstrappingChooser {
- // Test both BatchingChooser and DefaultChooser here. DefaultChooser with
- // just batch size defined should behave just like plain vanilla batching
+ // Test both BatchingChooser and DefaultChooser here. DefaultChooser with
+ // just batch size defined should behave just like plain vanilla batching
// chooser.
@Parameters
def parameters: java.util.Collection[Array[(MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser]] = Arrays.asList(
Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => new BootstrappingChooser(wrapped, bootstrapStreamMetadata)),
Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => new DefaultChooser(wrapped, bootstrapStreamMetadata = bootstrapStreamMetadata)))
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
index 884e458..0909956 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
@@ -26,12 +26,13 @@ import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.Partition
import org.apache.samza.config.MapConfig
-import scala.collection.JavaConversions._
import org.apache.samza.config.DefaultChooserConfig
import org.apache.samza.system.SystemStream
import org.apache.samza.system.SystemStreamMetadata
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import scala.collection.JavaConversions._
+
class TestDefaultChooser {
val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1);
val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), null, null, 2);
@@ -47,7 +48,7 @@ class TestDefaultChooser {
val mock0 = new MockMessageChooser
val mock1 = new MockMessageChooser
val mock2 = new MockMessageChooser
- // Create metadata for two envelopes (1 and 5) that are part of the same
+ // Create metadata for two envelopes (1 and 5) that are part of the same
// stream, but have different partitions and offsets.
val env1Metadata = new SystemStreamPartitionMetadata(null, "123", null)
val env5Metadata = new SystemStreamPartitionMetadata(null, "321", null)
@@ -75,28 +76,28 @@ class TestDefaultChooser {
chooser.register(envelope2.getSystemStreamPartition, null)
chooser.register(envelope3.getSystemStreamPartition, null)
chooser.register(envelope5.getSystemStreamPartition, null)
- // Add a bootstrap stream that's already caught up. If everything is
+ // Add a bootstrap stream that's already caught up. If everything is
// working properly, it shouldn't interfere with anything.
chooser.register(envelope8.getSystemStreamPartition, "654")
chooser.start
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
// Load with a non-bootstrap stream, and should still get null.
chooser.update(envelope3)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
// Load with a bootstrap stream, should get that envelope.
chooser.update(envelope1)
assertEquals(envelope1, chooser.choose)
// Should block envelope3 since we have no message from envelope1's bootstrap stream.
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
// Load envelope2 from non-bootstrap stream with higher priority than envelope3.
chooser.update(envelope2)
// Should block envelope2 since we have no message from envelope1's bootstrap stream.
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
// Test batching by giving chooser envelope1 and envelope5, both from same stream, but envelope1 should be preferred partition.
chooser.update(envelope5)
@@ -107,14 +108,14 @@ class TestDefaultChooser {
chooser.update(envelope1)
assertEquals(envelope5, chooser.choose)
assertEquals(envelope1, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
// Now we're back to just envelope3, envelope2. Let's catch up envelope1's SSP using envelope4's offset.
chooser.update(envelope4)
assertEquals(envelope4, chooser.choose)
// Should still block envelopes 1 and 2 because the second partition hasn't caught up yet.
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
// Now catch up the second partition.
chooser.update(envelope6)
@@ -135,7 +136,7 @@ class TestDefaultChooser {
// Now we should finally get the lowest priority non-bootstrap stream, envelope3.
assertEquals(envelope3, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
}
@Test
@@ -166,4 +167,4 @@ class TestDefaultChooser {
class MockBlockingEnvelopeMap extends BlockingEnvelopeMap {
def start = Unit
def stop = Unit
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala
index 01802b9..1329e84 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala
@@ -19,15 +19,16 @@
package org.apache.samza.system.chooser
-import org.junit.Assert._
-import org.junit.Test
+import java.util.Arrays
+
import org.apache.samza.Partition
import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.system.SystemStreamPartition
+import org.junit.Assert._
+import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters
-import java.util.Arrays
@RunWith(value = classOf[Parameterized])
class TestRoundRobinChooser(getChooser: () => MessageChooser) {
@@ -43,12 +44,12 @@ class TestRoundRobinChooser(getChooser: () => MessageChooser) {
chooser.register(envelope3.getSystemStreamPartition, "123")
chooser.start
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
// Test one message.
chooser.update(envelope1)
assertEquals(envelope1, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
// Verify simple ordering.
chooser.update(envelope1)
@@ -58,7 +59,7 @@ class TestRoundRobinChooser(getChooser: () => MessageChooser) {
assertEquals(envelope1, chooser.choose)
assertEquals(envelope2, chooser.choose)
assertEquals(envelope3, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
// Verify mixed ordering.
chooser.update(envelope2)
@@ -72,7 +73,7 @@ class TestRoundRobinChooser(getChooser: () => MessageChooser) {
assertEquals(envelope1, chooser.choose)
assertEquals(envelope2, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
// Verify simple ordering with different starting envelope.
chooser.update(envelope2)
@@ -82,7 +83,7 @@ class TestRoundRobinChooser(getChooser: () => MessageChooser) {
assertEquals(envelope2, chooser.choose)
assertEquals(envelope1, chooser.choose)
assertEquals(envelope3, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
}
}
@@ -92,4 +93,4 @@ object TestRoundRobinChooser {
// plain vanilla round robin chooser.
@Parameters
def parameters: java.util.Collection[Array[() => MessageChooser]] = Arrays.asList(Array(() => new RoundRobinChooser), Array(() => new DefaultChooser))
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala
index 4cde630..3e435ae 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala
@@ -19,18 +19,20 @@
package org.apache.samza.system.chooser
+import java.util.Arrays
+
import org.junit.Assert._
import org.junit.Test
-import org.apache.samza.system.IncomingMessageEnvelope
-import scala.collection.immutable.Queue
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.Partition
-import org.apache.samza.SamzaException
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters
-import java.util.Arrays
+import org.apache.samza.Partition
+import org.apache.samza.SamzaException
+import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.system.SystemStream
+import org.apache.samza.system.SystemStreamPartition
+
+import scala.collection.immutable.Queue
@RunWith(value = classOf[Parameterized])
class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, MessageChooser], MessageChooser) => MessageChooser) {
@@ -68,10 +70,10 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me
chooser.register(envelope1.getSystemStreamPartition, null)
chooser.start
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
chooser.update(envelope1)
assertEquals(envelope1, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
}
@Test
@@ -85,7 +87,7 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me
// The SSP for envelope2 is not defined as a priority stream.
chooser.register(envelope2.getSystemStreamPartition, null)
chooser.start
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
try {
chooser.update(envelope2)
@@ -106,18 +108,18 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me
chooser.register(envelope1.getSystemStreamPartition, null)
chooser.start
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
chooser.update(envelope1)
chooser.update(envelope4)
assertEquals(envelope1, chooser.choose)
assertEquals(envelope4, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
chooser.update(envelope4)
chooser.update(envelope1)
assertEquals(envelope4, chooser.choose)
assertEquals(envelope1, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
}
@Test
@@ -132,18 +134,18 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me
chooser.register(envelope3.getSystemStreamPartition, null)
chooser.start
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
chooser.update(envelope2)
chooser.update(envelope3)
assertEquals(envelope2, chooser.choose)
assertEquals(envelope3, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
chooser.update(envelope3)
chooser.update(envelope2)
assertEquals(envelope3, chooser.choose)
assertEquals(envelope2, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
}
@Test
@@ -160,30 +162,30 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me
chooser.register(envelope2.getSystemStreamPartition, null)
chooser.start
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
chooser.update(envelope1)
chooser.update(envelope4)
assertEquals(envelope1, chooser.choose)
assertEquals(envelope4, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
chooser.update(envelope4)
chooser.update(envelope1)
assertEquals(envelope4, chooser.choose)
assertEquals(envelope1, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
chooser.update(envelope2)
chooser.update(envelope4)
assertEquals(envelope2, chooser.choose)
assertEquals(envelope4, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
chooser.update(envelope1)
chooser.update(envelope2)
assertEquals(envelope1, chooser.choose)
assertEquals(envelope2, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
}
@Test
@@ -203,18 +205,18 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me
chooser.register(envelope2.getSystemStreamPartition, null)
chooser.start
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
chooser.update(envelope1)
chooser.update(envelope4)
assertEquals(envelope1, chooser.choose)
assertEquals(envelope4, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
chooser.update(envelope4)
chooser.update(envelope1)
assertEquals(envelope4, chooser.choose)
assertEquals(envelope1, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
chooser.update(envelope2)
chooser.update(envelope4)
@@ -222,18 +224,18 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me
// priority.
assertEquals(envelope4, chooser.choose)
assertEquals(envelope2, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
chooser.update(envelope1)
chooser.update(envelope2)
assertEquals(envelope1, chooser.choose)
assertEquals(envelope2, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
// Just the low priority stream.
chooser.update(envelope2)
assertEquals(envelope2, chooser.choose)
- assertEquals(null, chooser.choose)
+ assertNull(chooser.choose)
}
}
@@ -245,4 +247,4 @@ object TestTieredPriorityChooser {
def parameters: java.util.Collection[Array[(Map[SystemStream, Int], Map[Int, MessageChooser], MessageChooser) => MessageChooser]] = Arrays.asList(
Array((priorities: Map[SystemStream, Int], choosers: Map[Int, MessageChooser], default: MessageChooser) => new TieredPriorityChooser(priorities, choosers, default)),
Array((priorities: Map[SystemStream, Int], choosers: Map[Int, MessageChooser], default: MessageChooser) => new DefaultChooser(default, None, priorities, choosers)))
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala
index fb26bfc..525d126 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala
@@ -19,20 +19,22 @@
package org.apache.samza.system.filereader
-import org.junit.Assert._
-import scala.collection.JavaConversions._
import java.io.PrintWriter
import java.io.File
-import org.scalatest.junit.AssertionsForJUnit
-import org.junit.Test
-import org.junit.Before
-import org.junit.After
import java.io.RandomAccessFile
+
+import org.apache.samza.SamzaException
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.Partition
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.junit.Assert._
+import org.junit.Test
+import org.junit.Before
+import org.junit.After
+import org.scalatest.junit.AssertionsForJUnit
+
import scala.collection.mutable.HashMap
-import org.apache.samza.SamzaException
+import scala.collection.JavaConversions._
class TestFileReaderSystemAdmin extends AssertionsForJUnit {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
index f505eb1..5707bb4 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
@@ -22,12 +22,14 @@ package org.apache.samza.system.filereader
import java.io.File
import java.io.FileWriter
import java.io.PrintWriter
+
import org.apache.samza.Partition
import org.apache.samza.system.SystemStreamPartition
import org.junit.AfterClass
import org.junit.Assert._
import org.junit.BeforeClass
import org.junit.Test
+
import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala
index 330df78..c3295f3 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala
@@ -19,11 +19,12 @@
package org.apache.samza.system.filereader
+import org.apache.samza.SamzaException
import org.junit.Assert._
-import scala.collection.JavaConversions._
-import org.scalatest.junit.AssertionsForJUnit
import org.junit.Test
-import org.apache.samza.SamzaException
+import org.scalatest.junit.AssertionsForJUnit
+
+import scala.collection.JavaConversions._
class TestFileReaderSystemFactory extends AssertionsForJUnit {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
index 7cfeb5a..c141b5f 100644
--- a/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala
@@ -19,10 +19,10 @@
package org.apache.samza.task
-import org.junit.Assert._
-import org.junit.Test
import org.apache.samza.task.TaskCoordinator.RequestScope
import org.apache.samza.container.TaskName
+import org.junit.Assert._
+import org.junit.Test
class TestReadableCoordinator {
val taskName = new TaskName("P0")
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala b/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala
index 6353378..ee56e20 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.samza.util
import org.junit.Assert._
@@ -28,9 +29,9 @@ class TestDaemonThreadFactory {
val dtf = new DaemonThreadFactory(testThreadName)
val threadWithName = dtf.newThread(new Runnable {
def run() {
- //Not testing this particular method
+ // Not testing this particular method
}
})
- assertEquals(threadWithName.getName, ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX+testThreadName)
+ assertEquals(ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX + testThreadName, threadWithName.getName)
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
index 4a561d1..9ba8a4d 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,14 +15,13 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
package org.apache.samza.util
+import org.apache.samza.util.ExponentialSleepStrategy.RetryLoop
import org.junit.Assert._
import org.junit.Test
-import org.apache.samza.util.ExponentialSleepStrategy.RetryLoop
class TestExponentialSleepStrategy {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
index 7c314ce..8c21901 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.samza.util
import org.apache.samza.Partition
@@ -31,6 +32,7 @@ import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.util.Util._
import org.junit.Assert._
import org.junit.Test
+
import scala.collection.JavaConversions._
import scala.util.Random
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
index 7a23041..b76d5ad 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
@@ -18,10 +18,10 @@
*/
package org.apache.samza.checkpoint.kafka
+import org.apache.samza.SamzaException
import org.apache.samza.container.TaskName
import org.junit.Assert._
import org.junit.{Before, Test}
-import org.apache.samza.SamzaException
class TestKafkaCheckpointLogKey {
@Before
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 4827731..553d6b4 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -19,6 +19,7 @@
package org.apache.samza.checkpoint.kafka
+import kafka.admin.AdminUtils
import kafka.common.InvalidMessageSizeException
import kafka.common.UnknownTopicOrPartitionException
import kafka.message.InvalidMessageException
@@ -31,20 +32,20 @@ import kafka.utils.TestZKUtils
import kafka.utils.Utils
import kafka.utils.ZKStringSerializer
import kafka.zk.EmbeddedZookeeper
+
import org.I0Itec.zkclient.ZkClient
import org.apache.samza.checkpoint.Checkpoint
+import org.apache.samza.config.MapConfig
import org.apache.samza.container.TaskName
+import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
import org.apache.samza.serializers.CheckpointSerde
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.util.{ ClientUtilTopicMetadataStore, TopicMetadataStore }
import org.apache.samza.{ SamzaException, Partition }
import org.junit.Assert._
import org.junit.{ AfterClass, BeforeClass, Test }
-import scala.collection.JavaConversions._
+
import scala.collection._
-import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
-import kafka.admin.AdminUtils
-import org.apache.samza.config.MapConfig
import scala.collection.JavaConversions._
object TestKafkaCheckpointManager {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 468aa3d..8109f73 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -19,15 +19,18 @@
package org.apache.samza.config
-import org.junit.Assert._
-import org.junit.Test
import java.net.URI
import java.io.File
import java.util.Properties
-import scala.collection.JavaConversions._
-import org.apache.samza.config.factories.PropertiesConfigFactory
+
import kafka.consumer.ConsumerConfig
+import org.apache.samza.config.factories.PropertiesConfigFactory
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConversions._
+
class TestKafkaConfig {
@Test
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala
index fabae68..5cf82c2 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala
@@ -18,10 +18,12 @@
*/
package org.apache.samza.config
+
+import org.apache.samza.config.KafkaSerdeConfig.Config2KafkaSerde
import org.junit.Assert._
import org.junit.Test
+
import scala.collection.JavaConversions._
-import org.apache.samza.config.KafkaSerdeConfig.Config2KafkaSerde
class TestKafkaSerdeConfig {
val MAGIC_VAL = "1000"
@@ -33,7 +35,7 @@ class TestKafkaSerdeConfig {
@Test
def testKafkaConfigurationIsBackwardsCompatible {
- assert(config.getKafkaEncoder("test").getOrElse("").equals(MAGIC_VAL))
- assert(config.getKafkaDecoder("test").getOrElse("").equals(MAGIC_VAL))
+ assertEquals(MAGIC_VAL, config.getKafkaEncoder("test").getOrElse(""))
+ assertEquals(MAGIC_VAL, config.getKafkaDecoder("test").getOrElse(""))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
index 77cdbe3..89ced34 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
@@ -19,11 +19,13 @@
package org.apache.samza.config
-import org.junit.Test
import collection.JavaConversions._
+
+import org.apache.samza.SamzaException
import org.junit.Assert._
+import org.junit.Test
+
import KafkaConfig._
-import org.apache.samza.SamzaException
class TestRegExTopicGenerator {