You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:40 UTC

[44/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs to org.apache.storm, but try to provide some form of backwards compatability

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/SlidingWindowCounterTest.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/SlidingWindowCounterTest.java b/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/SlidingWindowCounterTest.java
new file mode 100644
index 0000000..e7b71cf
--- /dev/null
+++ b/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/SlidingWindowCounterTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.tools;
+
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+
+import static org.fest.assertions.api.Assertions.assertThat;
+
+public class SlidingWindowCounterTest {
+
+  private static final int ANY_WINDOW_LENGTH_IN_SLOTS = 2;
+  private static final Object ANY_OBJECT = "ANY_OBJECT";
+
+  @DataProvider
+  public Object[][] illegalWindowLengths() {
+    return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 }, { 1 } };
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalWindowLengths")
+  public void lessThanTwoSlotsShouldThrowIAE(int windowLengthInSlots) {
+    new SlidingWindowCounter<Object>(windowLengthInSlots);
+  }
+
+  @DataProvider
+  public Object[][] legalWindowLengths() {
+    return new Object[][]{ { 2 }, { 3 }, { 20 } };
+  }
+
+  @Test(dataProvider = "legalWindowLengths")
+  public void twoOrMoreSlotsShouldBeValid(int windowLengthInSlots) {
+    new SlidingWindowCounter<Object>(windowLengthInSlots);
+  }
+
+  @Test
+  public void newInstanceShouldHaveEmptyCounts() {
+    // given
+    SlidingWindowCounter<Object> counter = new SlidingWindowCounter<Object>(ANY_WINDOW_LENGTH_IN_SLOTS);
+
+    // when
+    Map<Object, Long> counts = counter.getCountsThenAdvanceWindow();
+
+    // then
+    assertThat(counts).isEmpty();
+  }
+
+  @DataProvider
+  public Object[][] simulatedCounterIterations() {
+    return new Object[][]{ { 2, new int[]{ 3, 2, 0, 0, 1, 0, 0, 0 }, new long[]{ 3, 5, 2, 0, 1, 1, 0, 0 } },
+        { 3, new int[]{ 3, 2, 0, 0, 1, 0, 0, 0 }, new long[]{ 3, 5, 5, 2, 1, 1, 1, 0 } },
+        { 4, new int[]{ 3, 2, 0, 0, 1, 0, 0, 0 }, new long[]{ 3, 5, 5, 5, 3, 1, 1, 1 } },
+        { 5, new int[]{ 3, 2, 0, 0, 1, 0, 0, 0 }, new long[]{ 3, 5, 5, 5, 6, 3, 1, 1 } },
+        { 5, new int[]{ 3, 11, 5, 13, 7, 17, 0, 3, 50, 600, 7000 },
+            new long[]{ 3, 14, 19, 32, 39, 53, 42, 40, 77, 670, 7653 } }, };
+  }
+
+  @Test(dataProvider = "simulatedCounterIterations")
+  public void testCounterWithSimulatedRuns(int windowLengthInSlots, int[] incrementsPerIteration,
+      long[] expCountsPerIteration) {
+    // given
+    SlidingWindowCounter<Object> counter = new SlidingWindowCounter<Object>(windowLengthInSlots);
+    int numIterations = incrementsPerIteration.length;
+
+    for (int i = 0; i < numIterations; i++) {
+      int numIncrements = incrementsPerIteration[i];
+      long expCounts = expCountsPerIteration[i];
+      // Objects are absent if they were zero both this iteration
+      // and the last -- if only this one, we need to report zero.
+      boolean expAbsent = ((expCounts == 0) && ((i == 0) || (expCountsPerIteration[i - 1] == 0)));
+
+      // given (for this iteration)
+      for (int j = 0; j < numIncrements; j++) {
+        counter.incrementCount(ANY_OBJECT);
+      }
+
+      // when (for this iteration)
+      Map<Object, Long> counts = counter.getCountsThenAdvanceWindow();
+
+      // then (for this iteration)
+      if (expAbsent) {
+        assertThat(counts).doesNotContainKey(ANY_OBJECT);
+      }
+      else {
+        assertThat(counts.get(ANY_OBJECT)).isEqualTo(expCounts);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/SlotBasedCounterTest.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/SlotBasedCounterTest.java b/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/SlotBasedCounterTest.java
new file mode 100644
index 0000000..e4f7dcc
--- /dev/null
+++ b/examples/storm-starter/test/jvm/org/apache/storm/starter/tools/SlotBasedCounterTest.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.tools;
+
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+
+import static org.fest.assertions.api.Assertions.assertThat;
+
+public class SlotBasedCounterTest {
+
+  private static final int ANY_NUM_SLOTS = 1;
+  private static final int ANY_SLOT = 0;
+  private static final Object ANY_OBJECT = "ANY_OBJECT";
+
+  @DataProvider
+  public Object[][] illegalNumSlotsData() {
+    return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalNumSlotsData")
+  public void negativeOrZeroNumSlotsShouldThrowIAE(int numSlots) {
+    new SlotBasedCounter<Object>(numSlots);
+  }
+
+  @DataProvider
+  public Object[][] legalNumSlotsData() {
+    return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
+  }
+
+  @Test(dataProvider = "legalNumSlotsData")
+  public void positiveNumSlotsShouldBeOk(int numSlots) {
+    new SlotBasedCounter<Object>(numSlots);
+  }
+
+  @Test
+  public void newInstanceShouldHaveEmptyCounts() {
+    // given
+    SlotBasedCounter<Object> counter = new SlotBasedCounter<Object>(ANY_NUM_SLOTS);
+
+    // when
+    Map<Object, Long> counts = counter.getCounts();
+
+    // then
+    assertThat(counts).isEmpty();
+  }
+
+  @Test
+  public void shouldReturnNonEmptyCountsWhenAtLeastOneObjectWasCounted() {
+    // given
+    SlotBasedCounter<Object> counter = new SlotBasedCounter<Object>(ANY_NUM_SLOTS);
+    counter.incrementCount(ANY_OBJECT, ANY_SLOT);
+
+    // when
+    Map<Object, Long> counts = counter.getCounts();
+
+    // then
+    assertThat(counts).isNotEmpty();
+
+    // additional tests that go beyond what this test is primarily about
+    assertThat(counts.size()).isEqualTo(1);
+    assertThat(counts.get(ANY_OBJECT)).isEqualTo(1);
+  }
+
+  @DataProvider
+  public Object[][] incrementCountData() {
+    return new Object[][]{ { new String[]{ "foo", "bar" }, new int[]{ 3, 2 } } };
+  }
+
+  @Test(dataProvider = "incrementCountData")
+  public void shouldIncrementCount(Object[] objects, int[] expCounts) {
+    // given
+    SlotBasedCounter<Object> counter = new SlotBasedCounter<Object>(ANY_NUM_SLOTS);
+
+    // when
+    for (int i = 0; i < objects.length; i++) {
+      Object obj = objects[i];
+      int numIncrements = expCounts[i];
+      for (int j = 0; j < numIncrements; j++) {
+        counter.incrementCount(obj, ANY_SLOT);
+      }
+    }
+
+    // then
+    for (int i = 0; i < objects.length; i++) {
+      assertThat(counter.getCount(objects[i], ANY_SLOT)).isEqualTo(expCounts[i]);
+    }
+    assertThat(counter.getCount("nonexistentObject", ANY_SLOT)).isEqualTo(0);
+  }
+
+  @Test
+  public void shouldReturnZeroForNonexistentObject() {
+    // given
+    SlotBasedCounter<Object> counter = new SlotBasedCounter<Object>(ANY_NUM_SLOTS);
+
+    // when
+    counter.incrementCount("somethingElse", ANY_SLOT);
+
+    // then
+    assertThat(counter.getCount("nonexistentObject", ANY_SLOT)).isEqualTo(0);
+  }
+
+  @Test
+  public void shouldIncrementCountOnlyOneSlotAtATime() {
+    // given
+    int numSlots = 3;
+    Object obj = Long.valueOf(10);
+    SlotBasedCounter<Object> counter = new SlotBasedCounter<Object>(numSlots);
+
+    // when (empty)
+    // then
+    assertThat(counter.getCount(obj, 0)).isEqualTo(0);
+    assertThat(counter.getCount(obj, 1)).isEqualTo(0);
+    assertThat(counter.getCount(obj, 2)).isEqualTo(0);
+
+    // when
+    counter.incrementCount(obj, 1);
+
+    // then
+    assertThat(counter.getCount(obj, 0)).isEqualTo(0);
+    assertThat(counter.getCount(obj, 1)).isEqualTo(1);
+    assertThat(counter.getCount(obj, 2)).isEqualTo(0);
+  }
+
+  @Test
+  public void wipeSlotShouldSetAllCountsInSlotToZero() {
+    // given
+    SlotBasedCounter<Object> counter = new SlotBasedCounter<Object>(ANY_NUM_SLOTS);
+    Object countWasOne = "countWasOne";
+    Object countWasThree = "countWasThree";
+    counter.incrementCount(countWasOne, ANY_SLOT);
+    counter.incrementCount(countWasThree, ANY_SLOT);
+    counter.incrementCount(countWasThree, ANY_SLOT);
+    counter.incrementCount(countWasThree, ANY_SLOT);
+
+    // when
+    counter.wipeSlot(ANY_SLOT);
+
+    // then
+    assertThat(counter.getCount(countWasOne, ANY_SLOT)).isEqualTo(0);
+    assertThat(counter.getCount(countWasThree, ANY_SLOT)).isEqualTo(0);
+  }
+
+  @Test
+  public void wipeZerosShouldRemoveAnyObjectsWithZeroTotalCount() {
+    // given
+    SlotBasedCounter<Object> counter = new SlotBasedCounter<Object>(2);
+    int wipeSlot = 0;
+    int otherSlot = 1;
+    Object willBeRemoved = "willBeRemoved";
+    Object willContinueToBeTracked = "willContinueToBeTracked";
+    counter.incrementCount(willBeRemoved, wipeSlot);
+    counter.incrementCount(willContinueToBeTracked, wipeSlot);
+    counter.incrementCount(willContinueToBeTracked, otherSlot);
+
+    // when
+    counter.wipeSlot(wipeSlot);
+    counter.wipeZeros();
+
+    // then
+    assertThat(counter.getCounts()).doesNotContainKey(willBeRemoved);
+    assertThat(counter.getCounts()).containsKey(willContinueToBeTracked);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java b/examples/storm-starter/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java
deleted file mode 100644
index 278a513..0000000
--- a/examples/storm-starter/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.bolt;
-
-import backtype.storm.Config;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.MockTupleHelpers;
-import com.google.common.collect.Lists;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import java.util.Map;
-
-import static org.fest.assertions.api.Assertions.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
-
-public class IntermediateRankingsBoltTest {
-
-  private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
-  private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
-  private static final Object ANY_OBJECT = new Object();
-  private static final int ANY_TOPN = 10;
-  private static final long ANY_COUNT = 42;
-
-  private Tuple mockRankableTuple(Object obj, long count) {
-    Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
-    when(tuple.getValues()).thenReturn(Lists.newArrayList(ANY_OBJECT, ANY_COUNT));
-    return tuple;
-  }
-
-  @DataProvider
-  public Object[][] illegalTopN() {
-    return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
-  }
-
-  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalTopN")
-  public void negativeOrZeroTopNShouldThrowIAE(int topN) {
-    new IntermediateRankingsBolt(topN);
-  }
-
-  @DataProvider
-  public Object[][] illegalEmitFrequency() {
-    return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
-  }
-
-  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalEmitFrequency")
-  public void negativeOrZeroEmitFrequencyShouldThrowIAE(int emitFrequencyInSeconds) {
-    new IntermediateRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
-  }
-
-  @DataProvider
-  public Object[][] legalTopN() {
-    return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
-  }
-
-  @Test(dataProvider = "legalTopN")
-  public void positiveTopNShouldBeOk(int topN) {
-    new IntermediateRankingsBolt(topN);
-  }
-
-  @DataProvider
-  public Object[][] legalEmitFrequency() {
-    return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
-  }
-
-  @Test(dataProvider = "legalEmitFrequency")
-  public void positiveEmitFrequencyShouldBeOk(int emitFrequencyInSeconds) {
-    new IntermediateRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
-  }
-
-  @Test
-  public void shouldEmitSomethingIfTickTupleIsReceived() {
-    // given
-    Tuple tickTuple = MockTupleHelpers.mockTickTuple();
-    BasicOutputCollector collector = mock(BasicOutputCollector.class);
-    IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
-
-    // when
-    bolt.execute(tickTuple, collector);
-
-    // then
-    // verifyZeroInteractions(collector);
-    verify(collector).emit(any(Values.class));
-  }
-
-  @Test
-  public void shouldEmitNothingIfNormalTupleIsReceived() {
-    // given
-    Tuple normalTuple = mockRankableTuple(ANY_OBJECT, ANY_COUNT);
-    BasicOutputCollector collector = mock(BasicOutputCollector.class);
-    IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
-
-    // when
-    bolt.execute(normalTuple, collector);
-
-    // then
-    verifyZeroInteractions(collector);
-  }
-
-  @Test
-  public void shouldDeclareOutputFields() {
-    // given
-    OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
-    IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
-
-    // when
-    bolt.declareOutputFields(declarer);
-
-    // then
-    verify(declarer, times(1)).declare(any(Fields.class));
-  }
-
-  @Test
-  public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
-    // given
-    IntermediateRankingsBolt bolt = new IntermediateRankingsBolt();
-
-    // when
-    Map<String, Object> componentConfig = bolt.getComponentConfiguration();
-
-    // then
-    assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
-    Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
-    assertThat(emitFrequencyInSeconds).isGreaterThan(0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/test/jvm/storm/starter/bolt/RollingCountBoltTest.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/bolt/RollingCountBoltTest.java b/examples/storm-starter/test/jvm/storm/starter/bolt/RollingCountBoltTest.java
deleted file mode 100644
index ecb1216..0000000
--- a/examples/storm-starter/test/jvm/storm/starter/bolt/RollingCountBoltTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.bolt;
-
-import backtype.storm.Config;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.MockTupleHelpers;
-import org.testng.annotations.Test;
-
-import java.util.Map;
-
-import static org.fest.assertions.api.Assertions.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
-
-public class RollingCountBoltTest {
-
-  private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
-  private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
-
-  private Tuple mockNormalTuple(Object obj) {
-    Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
-    when(tuple.getValue(0)).thenReturn(obj);
-    return tuple;
-  }
-
-  @SuppressWarnings("rawtypes")
-  @Test
-  public void shouldEmitNothingIfNoObjectHasBeenCountedYetAndTickTupleIsReceived() {
-    // given
-    Tuple tickTuple = MockTupleHelpers.mockTickTuple();
-    RollingCountBolt bolt = new RollingCountBolt();
-    Map conf = mock(Map.class);
-    TopologyContext context = mock(TopologyContext.class);
-    OutputCollector collector = mock(OutputCollector.class);
-    bolt.prepare(conf, context, collector);
-
-    // when
-    bolt.execute(tickTuple);
-
-    // then
-    verifyZeroInteractions(collector);
-  }
-
-  @SuppressWarnings("rawtypes")
-  @Test
-  public void shouldEmitSomethingIfAtLeastOneObjectWasCountedAndTickTupleIsReceived() {
-    // given
-    Tuple normalTuple = mockNormalTuple(new Object());
-    Tuple tickTuple = MockTupleHelpers.mockTickTuple();
-
-    RollingCountBolt bolt = new RollingCountBolt();
-    Map conf = mock(Map.class);
-    TopologyContext context = mock(TopologyContext.class);
-    OutputCollector collector = mock(OutputCollector.class);
-    bolt.prepare(conf, context, collector);
-
-    // when
-    bolt.execute(normalTuple);
-    bolt.execute(tickTuple);
-
-    // then
-    verify(collector).emit(any(Values.class));
-  }
-
-  @Test
-  public void shouldDeclareOutputFields() {
-    // given
-    OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
-    RollingCountBolt bolt = new RollingCountBolt();
-
-    // when
-    bolt.declareOutputFields(declarer);
-
-    // then
-    verify(declarer, times(1)).declare(any(Fields.class));
-
-  }
-
-  @Test
-  public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
-    // given
-    RollingCountBolt bolt = new RollingCountBolt();
-
-    // when
-    Map<String, Object> componentConfig = bolt.getComponentConfiguration();
-
-    // then
-    assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
-    Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
-    assertThat(emitFrequencyInSeconds).isGreaterThan(0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java b/examples/storm-starter/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java
deleted file mode 100644
index a6af931..0000000
--- a/examples/storm-starter/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.bolt;
-
-import backtype.storm.Config;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.MockTupleHelpers;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-import storm.starter.tools.Rankings;
-
-import java.util.Map;
-
-import static org.fest.assertions.api.Assertions.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
-
-public class TotalRankingsBoltTest {
-
-  private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
-  private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
-  private static final Object ANY_OBJECT = new Object();
-  private static final int ANY_TOPN = 10;
-  private static final long ANY_COUNT = 42;
-
-  private Tuple mockRankingsTuple(Object obj, long count) {
-    Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
-    Rankings rankings = mock(Rankings.class);
-    when(tuple.getValue(0)).thenReturn(rankings);
-    return tuple;
-  }
-
-  @DataProvider
-  public Object[][] illegalTopN() {
-    return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
-  }
-
-  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalTopN")
-  public void negativeOrZeroTopNShouldThrowIAE(int topN) {
-    new TotalRankingsBolt(topN);
-  }
-
-  @DataProvider
-  public Object[][] illegalEmitFrequency() {
-    return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
-  }
-
-  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalEmitFrequency")
-  public void negativeOrZeroEmitFrequencyShouldThrowIAE(int emitFrequencyInSeconds) {
-    new TotalRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
-  }
-
-  @DataProvider
-  public Object[][] legalTopN() {
-    return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
-  }
-
-  @Test(dataProvider = "legalTopN")
-  public void positiveTopNShouldBeOk(int topN) {
-    new TotalRankingsBolt(topN);
-  }
-
-  @DataProvider
-  public Object[][] legalEmitFrequency() {
-    return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
-  }
-
-  @Test(dataProvider = "legalEmitFrequency")
-  public void positiveEmitFrequencyShouldBeOk(int emitFrequencyInSeconds) {
-    new TotalRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
-  }
-
-  @Test
-  public void shouldEmitSomethingIfTickTupleIsReceived() {
-    // given
-    Tuple tickTuple = MockTupleHelpers.mockTickTuple();
-    BasicOutputCollector collector = mock(BasicOutputCollector.class);
-    TotalRankingsBolt bolt = new TotalRankingsBolt();
-
-    // when
-    bolt.execute(tickTuple, collector);
-
-    // then
-    // verifyZeroInteractions(collector);
-    verify(collector).emit(any(Values.class));
-  }
-
-  @Test
-  public void shouldEmitNothingIfNormalTupleIsReceived() {
-    // given
-    Tuple normalTuple = mockRankingsTuple(ANY_OBJECT, ANY_COUNT);
-    BasicOutputCollector collector = mock(BasicOutputCollector.class);
-    TotalRankingsBolt bolt = new TotalRankingsBolt();
-
-    // when
-    bolt.execute(normalTuple, collector);
-
-    // then
-    verifyZeroInteractions(collector);
-  }
-
-  @Test
-  public void shouldDeclareOutputFields() {
-    // given
-    OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
-    TotalRankingsBolt bolt = new TotalRankingsBolt();
-
-    // when
-    bolt.declareOutputFields(declarer);
-
-    // then
-    verify(declarer, times(1)).declare(any(Fields.class));
-  }
-
-  @Test
-  public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
-    // given
-    TotalRankingsBolt bolt = new TotalRankingsBolt();
-
-    // when
-    Map<String, Object> componentConfig = bolt.getComponentConfiguration();
-
-    // then
-    assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
-    Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
-    assertThat(emitFrequencyInSeconds).isGreaterThan(0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/test/jvm/storm/starter/tools/NthLastModifiedTimeTrackerTest.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/NthLastModifiedTimeTrackerTest.java b/examples/storm-starter/test/jvm/storm/starter/tools/NthLastModifiedTimeTrackerTest.java
deleted file mode 100644
index fe4d987..0000000
--- a/examples/storm-starter/test/jvm/storm/starter/tools/NthLastModifiedTimeTrackerTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.tools;
-
-import backtype.storm.utils.Time;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import static org.fest.assertions.api.Assertions.assertThat;
-
-public class NthLastModifiedTimeTrackerTest {
-
-  private static final int ANY_NUM_TIMES_TO_TRACK = 3;
-  private static final int MILLIS_IN_SEC = 1000;
-
-  @DataProvider
-  public Object[][] illegalNumTimesData() {
-    return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
-  }
-
-  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalNumTimesData")
-  public void negativeOrZeroNumTimesToTrackShouldThrowIAE(int numTimesToTrack) {
-    new NthLastModifiedTimeTracker(numTimesToTrack);
-  }
-
-  @DataProvider
-  public Object[][] legalNumTimesData() {
-    return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
-  }
-
-  @Test(dataProvider = "legalNumTimesData")
-  public void positiveNumTimesToTrackShouldBeOk(int numTimesToTrack) {
-    new NthLastModifiedTimeTracker(numTimesToTrack);
-  }
-
-  @DataProvider
-  public Object[][] whenNotYetMarkedAsModifiedData() {
-    return new Object[][]{ { 0 }, { 1 }, { 2 }, { 3 }, { 4 }, { 5 }, { 8 }, { 10 } };
-  }
-
-  @Test(dataProvider = "whenNotYetMarkedAsModifiedData")
-  public void shouldReturnCorrectModifiedTimeEvenWhenNotYetMarkedAsModified(int secondsToAdvance) {
-    // given
-    Time.startSimulating();
-    NthLastModifiedTimeTracker tracker = new NthLastModifiedTimeTracker(ANY_NUM_TIMES_TO_TRACK);
-
-    // when
-    advanceSimulatedTimeBy(secondsToAdvance);
-    int seconds = tracker.secondsSinceOldestModification();
-
-    // then
-    assertThat(seconds).isEqualTo(secondsToAdvance);
-
-    // cleanup
-    Time.stopSimulating();
-  }
-
-  @DataProvider
-  public Object[][] simulatedTrackerIterations() {
-    return new Object[][]{ { 1, new int[]{ 0, 1 }, new int[]{ 0, 0 } }, { 1, new int[]{ 0, 2 }, new int[]{ 0, 0 } },
-        { 2, new int[]{ 2, 2 }, new int[]{ 2, 2 } }, { 2, new int[]{ 0, 4 }, new int[]{ 0, 4 } },
-        { 1, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 0, 0, 0, 0, 0, 0, 0 } },
-        { 1, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 0, 0, 0, 0, 0, 0, 0 } },
-        { 2, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 1, 1, 1, 1, 1, 1 } },
-        { 2, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 2, 2, 2, 2, 2, 2 } },
-        { 2, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 2, 3, 4, 5, 6, 7 } },
-        { 3, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 2, 2, 2, 2, 2, 2 } },
-        { 3, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 3, 5, 7, 9, 11, 13 } },
-        { 3, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 4, 4, 4, 4, 4, 4 } },
-        { 4, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 2, 3, 3, 3, 3, 3 } },
-        { 4, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 3, 6, 9, 12, 15, 18 } },
-        { 4, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 4, 6, 6, 6, 6, 6 } },
-        { 5, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 2, 3, 4, 4, 4, 4 } },
-        { 5, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 3, 6, 10, 14, 18, 22 } },
-        { 5, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 4, 6, 8, 8, 8, 8 } },
-        { 6, new int[]{ 1, 1, 1, 1, 1, 1, 1 }, new int[]{ 1, 2, 3, 4, 5, 5, 5 } },
-        { 6, new int[]{ 1, 2, 3, 4, 5, 6, 7 }, new int[]{ 1, 3, 6, 10, 15, 20, 25 } },
-        { 6, new int[]{ 2, 2, 2, 2, 2, 2, 2 }, new int[]{ 2, 4, 6, 8, 10, 10, 10 } },
-        { 3, new int[]{ 1, 2, 3 }, new int[]{ 1, 3, 5 } } };
-  }
-
-  @Test(dataProvider = "simulatedTrackerIterations")
-  public void shouldReturnCorrectModifiedTimeWhenMarkedAsModified(int numTimesToTrack,
-      int[] secondsToAdvancePerIteration, int[] expLastModifiedTimes) {
-    // given
-    Time.startSimulating();
-    NthLastModifiedTimeTracker tracker = new NthLastModifiedTimeTracker(numTimesToTrack);
-
-    int[] modifiedTimes = new int[expLastModifiedTimes.length];
-
-    // when
-    int i = 0;
-    for (int secondsToAdvance : secondsToAdvancePerIteration) {
-      advanceSimulatedTimeBy(secondsToAdvance);
-      tracker.markAsModified();
-      modifiedTimes[i] = tracker.secondsSinceOldestModification();
-      i++;
-    }
-
-    // then
-    assertThat(modifiedTimes).isEqualTo(expLastModifiedTimes);
-
-    // cleanup
-    Time.stopSimulating();
-  }
-
-  private void advanceSimulatedTimeBy(int seconds) {
-    Time.advanceTime(seconds * MILLIS_IN_SEC);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/test/jvm/storm/starter/tools/RankableObjectWithFieldsTest.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/RankableObjectWithFieldsTest.java b/examples/storm-starter/test/jvm/storm/starter/tools/RankableObjectWithFieldsTest.java
deleted file mode 100644
index e83f922..0000000
--- a/examples/storm-starter/test/jvm/storm/starter/tools/RankableObjectWithFieldsTest.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.tools;
-
-import backtype.storm.tuple.Tuple;
-import com.google.common.collect.Lists;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.fest.assertions.api.Assertions.assertThat;
-import static org.mockito.Mockito.*;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
-
-public class RankableObjectWithFieldsTest {
-
-  private static final Object ANY_OBJECT = new Object();
-  private static final long ANY_COUNT = 271;
-  private static final String ANY_FIELD = "someAdditionalField";
-  private static final int GREATER_THAN = 1;
-  private static final int EQUAL_TO = 0;
-  private static final int SMALLER_THAN = -1;
-
-  @Test(expectedExceptions = IllegalArgumentException.class)
-  public void constructorWithNullObjectAndNoFieldsShouldThrowIAE() {
-    new RankableObjectWithFields(null, ANY_COUNT);
-  }
-
-  @Test(expectedExceptions = IllegalArgumentException.class)
-  public void constructorWithNullObjectAndFieldsShouldThrowIAE() {
-    Object someAdditionalField = new Object();
-    new RankableObjectWithFields(null, ANY_COUNT, someAdditionalField);
-  }
-
-  @Test(expectedExceptions = IllegalArgumentException.class)
-  public void constructorWithNegativeCountAndNoFieldsShouldThrowIAE() {
-    new RankableObjectWithFields(ANY_OBJECT, -1);
-  }
-
-  @Test(expectedExceptions = IllegalArgumentException.class)
-  public void constructorWithNegativeCountAndFieldsShouldThrowIAE() {
-    Object someAdditionalField = new Object();
-    new RankableObjectWithFields(ANY_OBJECT, -1, someAdditionalField);
-  }
-
-  @Test
-  public void shouldBeEqualToItself() {
-    RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, ANY_COUNT);
-    assertThat(r).isEqualTo(r);
-  }
-
-  @DataProvider
-  public Object[][] otherClassesData() {
-    return new Object[][]{ { new String("foo") }, { new Object() }, { Integer.valueOf(4) }, { Lists.newArrayList(7, 8,
-        9) } };
-  }
-
-  @Test(dataProvider = "otherClassesData")
-  public void shouldNotBeEqualToInstancesOfOtherClasses(Object notARankable) {
-    RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, ANY_COUNT);
-    assertFalse(r.equals(notARankable), r + " is equal to " + notARankable + " but it should not be");
-  }
-
-  @DataProvider
-  public Object[][] falseDuplicatesData() {
-    return new Object[][]{ { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("foo", 1) },
-        { new RankableObjectWithFields("foo", 1), new RankableObjectWithFields("Foo", 1) },
-        { new RankableObjectWithFields("foo", 1), new RankableObjectWithFields("FOO", 1) },
-        { new RankableObjectWithFields("foo", 1), new RankableObjectWithFields("bar", 1) },
-        { new RankableObjectWithFields("", 0), new RankableObjectWithFields("", 1) }, { new RankableObjectWithFields("",
-        1), new RankableObjectWithFields("bar", 1) } };
-  }
-
-  @Test(dataProvider = "falseDuplicatesData")
-  public void shouldNotBeEqualToFalseDuplicates(RankableObjectWithFields r, RankableObjectWithFields falseDuplicate) {
-    assertFalse(r.equals(falseDuplicate), r + " is equal to " + falseDuplicate + " but it should not be");
-  }
-
-  @Test(dataProvider = "falseDuplicatesData")
-  public void shouldHaveDifferentHashCodeThanFalseDuplicates(RankableObjectWithFields r,
-      RankableObjectWithFields falseDuplicate) {
-    assertThat(r.hashCode()).isNotEqualTo(falseDuplicate.hashCode());
-  }
-
-  @DataProvider
-  public Object[][] trueDuplicatesData() {
-    return new Object[][]{ { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("foo", 0) },
-        { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("foo", 0, "someOtherField") },
-        { new RankableObjectWithFields("foo", 0, "someField"), new RankableObjectWithFields("foo", 0,
-            "someOtherField") } };
-  }
-
-  @Test(dataProvider = "trueDuplicatesData")
-  public void shouldBeEqualToTrueDuplicates(RankableObjectWithFields r, RankableObjectWithFields trueDuplicate) {
-    assertTrue(r.equals(trueDuplicate), r + " is not equal to " + trueDuplicate + " but it should be");
-  }
-
-  @Test(dataProvider = "trueDuplicatesData")
-  public void shouldHaveSameHashCodeAsTrueDuplicates(RankableObjectWithFields r,
-      RankableObjectWithFields trueDuplicate) {
-    assertThat(r.hashCode()).isEqualTo(trueDuplicate.hashCode());
-  }
-
-  @DataProvider
-  public Object[][] compareToData() {
-    return new Object[][]{ { new RankableObjectWithFields("foo", 1000), new RankableObjectWithFields("foo", 0),
-        GREATER_THAN }, { new RankableObjectWithFields("foo", 1), new RankableObjectWithFields("foo", 0),
-        GREATER_THAN }, { new RankableObjectWithFields("foo", 1000), new RankableObjectWithFields("bar", 0),
-        GREATER_THAN }, { new RankableObjectWithFields("foo", 1), new RankableObjectWithFields("bar", 0),
-        GREATER_THAN }, { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("foo", 0), EQUAL_TO },
-        { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("bar", 0), EQUAL_TO },
-        { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("foo", 1000), SMALLER_THAN },
-        { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("foo", 1), SMALLER_THAN },
-        { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("bar", 1), SMALLER_THAN },
-        { new RankableObjectWithFields("foo", 0), new RankableObjectWithFields("bar", 1000), SMALLER_THAN }, };
-  }
-
-  @Test(dataProvider = "compareToData")
-  public void verifyCompareTo(RankableObjectWithFields first, RankableObjectWithFields second, int expCompareToValue) {
-    assertThat(first.compareTo(second)).isEqualTo(expCompareToValue);
-  }
-
-  @DataProvider
-  public Object[][] toStringData() {
-    return new Object[][]{ { new String("foo"), 0L }, { new String("BAR"), 8L } };
-  }
-
-  @Test(dataProvider = "toStringData")
-  public void toStringShouldContainStringRepresentationsOfObjectAndCount(Object obj, long count) {
-    // given
-    RankableObjectWithFields r = new RankableObjectWithFields(obj, count);
-
-    // when
-    String strRepresentation = r.toString();
-
-    // then
-    assertThat(strRepresentation).contains(obj.toString()).contains("" + count);
-  }
-
-  @Test
-  public void shouldReturnTheObject() {
-    // given
-    RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, ANY_COUNT, ANY_FIELD);
-
-    // when
-    Object obj = r.getObject();
-
-    // then
-    assertThat(obj).isEqualTo(ANY_OBJECT);
-  }
-
-  @Test
-  public void shouldReturnTheCount() {
-    // given
-    RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, ANY_COUNT, ANY_FIELD);
-
-    // when
-    long count = r.getCount();
-
-    // then
-    assertThat(count).isEqualTo(ANY_COUNT);
-  }
-
-  @DataProvider
-  public Object[][] fieldsData() {
-    return new Object[][]{ { ANY_OBJECT, ANY_COUNT, new Object[]{ ANY_FIELD } },
-        { "quux", 42L, new Object[]{ "one", "two", "three" } } };
-  }
-
-  @Test(dataProvider = "fieldsData")
-  public void shouldReturnTheFields(Object obj, long count, Object[] fields) {
-    // given
-    RankableObjectWithFields r = new RankableObjectWithFields(obj, count, fields);
-
-    // when
-    List<Object> actualFields = r.getFields();
-
-    // then
-    assertThat(actualFields).isEqualTo(Lists.newArrayList(fields));
-  }
-
-  @Test(expectedExceptions = UnsupportedOperationException.class)
-  public void fieldsShouldBeImmutable() {
-    // given
-    RankableObjectWithFields r = new RankableObjectWithFields(ANY_OBJECT, ANY_COUNT, ANY_FIELD);
-
-    // when
-    List<Object> fields = r.getFields();
-    // try to modify the list, which should fail
-    fields.remove(0);
-
-    // then (exception)
-  }
-
-  @Test
-  public void shouldCreateRankableObjectFromTuple() {
-    // given
-    Tuple tuple = mock(Tuple.class);
-    List<Object> tupleValues = Lists.newArrayList(ANY_OBJECT, ANY_COUNT, ANY_FIELD);
-    when(tuple.getValues()).thenReturn(tupleValues);
-
-    // when
-    RankableObjectWithFields r = RankableObjectWithFields.from(tuple);
-
-    // then
-    assertThat(r.getObject()).isEqualTo(ANY_OBJECT);
-    assertThat(r.getCount()).isEqualTo(ANY_COUNT);
-    List<Object> fields = new ArrayList<Object>();
-    fields.add(ANY_FIELD);
-    assertThat(r.getFields()).isEqualTo(fields);
-
-  }
-
-  @DataProvider
-  public Object[][] copyData() {
-    return new Object[][]{ { new RankableObjectWithFields("foo", 0) }, { new RankableObjectWithFields("foo", 3,
-        "someOtherField") }, { new RankableObjectWithFields("foo", 0, "someField") } };
-  }
-
-  // TODO: What would be a good test to ensure that RankableObjectWithFields is at least somewhat defensively copied?
-  //       The contract of Rankable#copy() returns a Rankable value, not a RankableObjectWithFields.
-  @Test(dataProvider = "copyData")
-  public void copyShouldReturnCopy(RankableObjectWithFields original) {
-    // given
-
-    // when
-    Rankable copy = original.copy();
-
-    // then
-    assertThat(copy.getObject()).isEqualTo(original.getObject());
-    assertThat(copy.getCount()).isEqualTo(original.getCount());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/test/jvm/storm/starter/tools/RankingsTest.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/RankingsTest.java b/examples/storm-starter/test/jvm/storm/starter/tools/RankingsTest.java
deleted file mode 100644
index cab02cb..0000000
--- a/examples/storm-starter/test/jvm/storm/starter/tools/RankingsTest.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.tools;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.jmock.lib.concurrent.Blitzer;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import java.util.List;
-
-import static org.fest.assertions.api.Assertions.assertThat;
-
-public class RankingsTest {
-
-  private static final int ANY_TOPN = 42;
-  private static final Rankable ANY_RANKABLE = new RankableObjectWithFields("someObject", ANY_TOPN);
-  private static final Rankable ZERO = new RankableObjectWithFields("ZERO_COUNT", 0);
-  private static final Rankable A = new RankableObjectWithFields("A", 1);
-  private static final Rankable B = new RankableObjectWithFields("B", 2);
-  private static final Rankable C = new RankableObjectWithFields("C", 3);
-  private static final Rankable D = new RankableObjectWithFields("D", 4);
-  private static final Rankable E = new RankableObjectWithFields("E", 5);
-  private static final Rankable F = new RankableObjectWithFields("F", 6);
-  private static final Rankable G = new RankableObjectWithFields("G", 7);
-  private static final Rankable H = new RankableObjectWithFields("H", 8);
-
-  @DataProvider
-  public Object[][] illegalTopNData() {
-    return new Object[][]{ { 0 }, { -1 }, { -2 }, { -10 } };
-  }
-
-  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalTopNData")
-  public void constructorWithNegativeOrZeroTopNShouldThrowIAE(int topN) {
-    new Rankings(topN);
-  }
-
-  @DataProvider
-  public Object[][] copyRankingsData() {
-    return new Object[][]{ { 5, Lists.newArrayList(A, B, C) }, { 2, Lists.newArrayList(A, B, C, D) },
-        { 1, Lists.newArrayList() }, { 1, Lists.newArrayList(A) }, { 1, Lists.newArrayList(A, B) } };
-  }
-
-  @Test(dataProvider = "copyRankingsData")
-  public void copyConstructorShouldReturnCopy(int topN, List<Rankable> rankables) {
-    // given
-    Rankings rankings = new Rankings(topN);
-    for (Rankable r : rankables) {
-      rankings.updateWith(r);
-    }
-
-    // when
-    Rankings copy = new Rankings(rankings);
-
-    // then
-    assertThat(copy.maxSize()).isEqualTo(rankings.maxSize());
-    assertThat(copy.getRankings()).isEqualTo(rankings.getRankings());
-  }
-
-  @DataProvider
-  public Object[][] defensiveCopyRankingsData() {
-    return new Object[][]{ { 5, Lists.newArrayList(A, B, C), Lists.newArrayList(D) }, { 2, Lists.newArrayList(A, B, C,
-        D), Lists.newArrayList(E, F) }, { 1, Lists.newArrayList(), Lists.newArrayList(A) }, { 1, Lists.newArrayList(A),
-        Lists.newArrayList(B) }, { 1, Lists.newArrayList(ZERO), Lists.newArrayList(B) }, { 1, Lists.newArrayList(ZERO),
-        Lists.newArrayList() } };
-  }
-
-  @Test(dataProvider = "defensiveCopyRankingsData")
-  public void copyConstructorShouldReturnDefensiveCopy(int topN, List<Rankable> rankables, List<Rankable> changes) {
-    // given
-    Rankings original = new Rankings(topN);
-    for (Rankable r : rankables) {
-      original.updateWith(r);
-    }
-    int expSize = original.size();
-    List<Rankable> expRankings = original.getRankings();
-
-    // when
-    Rankings copy = new Rankings(original);
-    for (Rankable r : changes) {
-      copy.updateWith(r);
-    }
-
-    // then
-    assertThat(original.size()).isEqualTo(expSize);
-    assertThat(original.getRankings()).isEqualTo(expRankings);
-  }
-
-  @DataProvider
-  public Object[][] legalTopNData() {
-    return new Object[][]{ { 1 }, { 2 }, { 1000 }, { 1000000 } };
-  }
-
-  @Test(dataProvider = "legalTopNData")
-  public void constructorWithPositiveTopNShouldBeOk(int topN) {
-    // given/when
-    Rankings rankings = new Rankings(topN);
-
-    // then
-    assertThat(rankings.maxSize()).isEqualTo(topN);
-  }
-
-  @Test
-  public void shouldHaveDefaultConstructor() {
-    new Rankings();
-  }
-
-  @Test
-  public void defaultConstructorShouldSetPositiveTopN() {
-    // given/when
-    Rankings rankings = new Rankings();
-
-    // then
-    assertThat(rankings.maxSize()).isGreaterThan(0);
-  }
-
-  @DataProvider
-  public Object[][] rankingsGrowData() {
-    return new Object[][]{ { 2, Lists.newArrayList(new RankableObjectWithFields("A", 1), new RankableObjectWithFields(
-        "B", 2), new RankableObjectWithFields("C", 3)) }, { 2, Lists.newArrayList(new RankableObjectWithFields("A", 1),
-        new RankableObjectWithFields("B", 2), new RankableObjectWithFields("C", 3), new RankableObjectWithFields("D",
-        4)) } };
-  }
-
-  @Test(dataProvider = "rankingsGrowData")
-  public void sizeOfRankingsShouldNotGrowBeyondTopN(int topN, List<Rankable> rankables) {
-    // sanity check of the provided test data
-    assertThat(rankables.size()).overridingErrorMessage(
-        "The supplied test data is not correct: the number of rankables <%d> should be greater than <%d>",
-        rankables.size(), topN).isGreaterThan(topN);
-
-    // given
-    Rankings rankings = new Rankings(topN);
-
-    // when
-    for (Rankable r : rankables) {
-      rankings.updateWith(r);
-    }
-
-    // then
-    assertThat(rankings.size()).isLessThanOrEqualTo(rankings.maxSize());
-  }
-
-  @DataProvider
-  public Object[][] simulatedRankingsData() {
-    return new Object[][]{ { Lists.newArrayList(A), Lists.newArrayList(A) }, { Lists.newArrayList(B, D, A, C),
-        Lists.newArrayList(D, C, B, A) }, { Lists.newArrayList(B, F, A, C, D, E), Lists.newArrayList(F, E, D, C, B,
-        A) }, { Lists.newArrayList(G, B, F, A, C, D, E, H), Lists.newArrayList(H, G, F, E, D, C, B, A) } };
-  }
-
-  @Test(dataProvider = "simulatedRankingsData")
-  public void shouldCorrectlyRankWhenUpdatedWithRankables(List<Rankable> unsorted, List<Rankable> expSorted) {
-    // given
-    Rankings rankings = new Rankings(unsorted.size());
-
-    // when
-    for (Rankable r : unsorted) {
-      rankings.updateWith(r);
-    }
-
-    // then
-    assertThat(rankings.getRankings()).isEqualTo(expSorted);
-  }
-
-  @Test(dataProvider = "simulatedRankingsData")
-  public void shouldCorrectlyRankWhenEmptyAndUpdatedWithOtherRankings(List<Rankable> unsorted,
-      List<Rankable> expSorted) {
-    // given
-    Rankings rankings = new Rankings(unsorted.size());
-    Rankings otherRankings = new Rankings(rankings.maxSize());
-    for (Rankable r : unsorted) {
-      otherRankings.updateWith(r);
-    }
-
-    // when
-    rankings.updateWith(otherRankings);
-
-    // then
-    assertThat(rankings.getRankings()).isEqualTo(expSorted);
-  }
-
-  @Test(dataProvider = "simulatedRankingsData")
-  public void shouldCorrectlyRankWhenUpdatedWithEmptyOtherRankings(List<Rankable> unsorted, List<Rankable> expSorted) {
-    // given
-    Rankings rankings = new Rankings(unsorted.size());
-    for (Rankable r : unsorted) {
-      rankings.updateWith(r);
-    }
-    Rankings emptyRankings = new Rankings(ANY_TOPN);
-
-    // when
-    rankings.updateWith(emptyRankings);
-
-    // then
-    assertThat(rankings.getRankings()).isEqualTo(expSorted);
-  }
-
-  @DataProvider
-  public Object[][] simulatedRankingsAndOtherRankingsData() {
-    return new Object[][]{ { Lists.newArrayList(A), Lists.newArrayList(A), Lists.newArrayList(A) },
-        { Lists.newArrayList(A, C), Lists.newArrayList(B, D), Lists.newArrayList(D, C, B, A) }, { Lists.newArrayList(B,
-        F, A), Lists.newArrayList(C, D, E), Lists.newArrayList(F, E, D, C, B, A) }, { Lists.newArrayList(G, B, F, A, C),
-        Lists.newArrayList(D, E, H), Lists.newArrayList(H, G, F, E, D, C, B, A) } };
-  }
-
-  @Test(dataProvider = "simulatedRankingsAndOtherRankingsData")
-  public void shouldCorrectlyRankWhenNotEmptyAndUpdatedWithOtherRankings(List<Rankable> unsorted,
-      List<Rankable> unsortedForOtherRankings, List<Rankable> expSorted) {
-    // given
-    Rankings rankings = new Rankings(expSorted.size());
-    for (Rankable r : unsorted) {
-      rankings.updateWith(r);
-    }
-    Rankings otherRankings = new Rankings(unsortedForOtherRankings.size());
-    for (Rankable r : unsortedForOtherRankings) {
-      otherRankings.updateWith(r);
-    }
-
-    // when
-    rankings.updateWith(otherRankings);
-
-    // then
-    assertThat(rankings.getRankings()).isEqualTo(expSorted);
-  }
-
-  @DataProvider
-  public Object[][] duplicatesData() {
-    Rankable A1 = new RankableObjectWithFields("A", 1);
-    Rankable A2 = new RankableObjectWithFields("A", 2);
-    Rankable A3 = new RankableObjectWithFields("A", 3);
-    return new Object[][]{ { Lists.newArrayList(ANY_RANKABLE, ANY_RANKABLE, ANY_RANKABLE) }, { Lists.newArrayList(A1,
-        A2, A3) }, };
-  }
-
-  @Test(dataProvider = "duplicatesData")
-  public void shouldNotRankDuplicateObjectsMoreThanOnce(List<Rankable> duplicates) {
-    // given
-    Rankings rankings = new Rankings(duplicates.size());
-
-    // when
-    for (Rankable r : duplicates) {
-      rankings.updateWith(r);
-    }
-
-    // then
-    assertThat(rankings.size()).isEqualTo(1);
-  }
-
-  @DataProvider
-  public Object[][] removeZeroRankingsData() {
-    return new Object[][]{ { Lists.newArrayList(A, ZERO), Lists.newArrayList(A) }, { Lists.newArrayList(A),
-        Lists.newArrayList(A) }, { Lists.newArrayList(ZERO, A), Lists.newArrayList(A) }, { Lists.newArrayList(ZERO),
-        Lists.newArrayList() }, { Lists.newArrayList(ZERO, new RankableObjectWithFields("ZERO2", 0)),
-        Lists.newArrayList() }, { Lists.newArrayList(B, ZERO, new RankableObjectWithFields("ZERO2", 0), D,
-        new RankableObjectWithFields("ZERO3", 0), new RankableObjectWithFields("ZERO4", 0), C), Lists.newArrayList(D, C,
-        B) }, { Lists.newArrayList(A, ZERO, B), Lists.newArrayList(B, A) } };
-  }
-
-  @Test(dataProvider = "removeZeroRankingsData")
-  public void shouldRemoveZeroCounts(List<Rankable> unsorted, List<Rankable> expSorted) {
-    // given
-    Rankings rankings = new Rankings(unsorted.size());
-    for (Rankable r : unsorted) {
-      rankings.updateWith(r);
-    }
-
-    // when
-    rankings.pruneZeroCounts();
-
-    // then
-    assertThat(rankings.getRankings()).isEqualTo(expSorted);
-  }
-
-  @Test
-  public void updatingWithNewRankablesShouldBeThreadSafe() throws InterruptedException {
-    // given
-    final List<Rankable> entries = ImmutableList.of(A, B, C, D);
-    final Rankings rankings = new Rankings(entries.size());
-
-    // We are capturing exceptions thrown in Blitzer's child threads into this data structure so that we can properly
-    // pass/fail this test.  The reason is that Blitzer doesn't report exceptions, which is a known bug in Blitzer
-    // (JMOCK-263).  See https://github.com/jmock-developers/jmock-library/issues/22 for more information.
-    final List<Exception> exceptions = Lists.newArrayList();
-    Blitzer blitzer = new Blitzer(1000);
-
-    // when
-    blitzer.blitz(new Runnable() {
-      public void run() {
-        for (Rankable r : entries) {
-          try {
-            rankings.updateWith(r);
-          }
-          catch (RuntimeException e) {
-            synchronized(exceptions) {
-              exceptions.add(e);
-            }
-          }
-        }
-      }
-    });
-    blitzer.shutdown();
-
-    // then
-    //
-    if (!exceptions.isEmpty()) {
-      for (Exception e : exceptions) {
-        System.err.println(Throwables.getStackTraceAsString(e));
-      }
-    }
-    assertThat(exceptions).isEmpty();
-  }
-
-  @Test(dataProvider = "copyRankingsData")
-  public void copyShouldReturnCopy(int topN, List<Rankable> rankables) {
-    // given
-    Rankings rankings = new Rankings(topN);
-    for (Rankable r : rankables) {
-      rankings.updateWith(r);
-    }
-
-    // when
-    Rankings copy = rankings.copy();
-
-    // then
-    assertThat(copy.maxSize()).isEqualTo(rankings.maxSize());
-    assertThat(copy.getRankings()).isEqualTo(rankings.getRankings());
-  }
-
-  @Test(dataProvider = "defensiveCopyRankingsData")
-  public void copyShouldReturnDefensiveCopy(int topN, List<Rankable> rankables, List<Rankable> changes) {
-    // given
-    Rankings original = new Rankings(topN);
-    for (Rankable r : rankables) {
-      original.updateWith(r);
-    }
-    int expSize = original.size();
-    List<Rankable> expRankings = original.getRankings();
-
-    // when
-    Rankings copy = original.copy();
-    for (Rankable r : changes) {
-      copy.updateWith(r);
-    }
-    copy.pruneZeroCounts();
-
-    // then
-    assertThat(original.size()).isEqualTo(expSize);
-    assertThat(original.getRankings()).isEqualTo(expRankings);
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/test/jvm/storm/starter/tools/SlidingWindowCounterTest.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/SlidingWindowCounterTest.java b/examples/storm-starter/test/jvm/storm/starter/tools/SlidingWindowCounterTest.java
deleted file mode 100644
index 920bf01..0000000
--- a/examples/storm-starter/test/jvm/storm/starter/tools/SlidingWindowCounterTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.tools;
-
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import java.util.Map;
-
-import static org.fest.assertions.api.Assertions.assertThat;
-
-public class SlidingWindowCounterTest {
-
-  private static final int ANY_WINDOW_LENGTH_IN_SLOTS = 2;
-  private static final Object ANY_OBJECT = "ANY_OBJECT";
-
-  @DataProvider
-  public Object[][] illegalWindowLengths() {
-    return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 }, { 1 } };
-  }
-
-  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalWindowLengths")
-  public void lessThanTwoSlotsShouldThrowIAE(int windowLengthInSlots) {
-    new SlidingWindowCounter<Object>(windowLengthInSlots);
-  }
-
-  @DataProvider
-  public Object[][] legalWindowLengths() {
-    return new Object[][]{ { 2 }, { 3 }, { 20 } };
-  }
-
-  @Test(dataProvider = "legalWindowLengths")
-  public void twoOrMoreSlotsShouldBeValid(int windowLengthInSlots) {
-    new SlidingWindowCounter<Object>(windowLengthInSlots);
-  }
-
-  @Test
-  public void newInstanceShouldHaveEmptyCounts() {
-    // given
-    SlidingWindowCounter<Object> counter = new SlidingWindowCounter<Object>(ANY_WINDOW_LENGTH_IN_SLOTS);
-
-    // when
-    Map<Object, Long> counts = counter.getCountsThenAdvanceWindow();
-
-    // then
-    assertThat(counts).isEmpty();
-  }
-
-  @DataProvider
-  public Object[][] simulatedCounterIterations() {
-    return new Object[][]{ { 2, new int[]{ 3, 2, 0, 0, 1, 0, 0, 0 }, new long[]{ 3, 5, 2, 0, 1, 1, 0, 0 } },
-        { 3, new int[]{ 3, 2, 0, 0, 1, 0, 0, 0 }, new long[]{ 3, 5, 5, 2, 1, 1, 1, 0 } },
-        { 4, new int[]{ 3, 2, 0, 0, 1, 0, 0, 0 }, new long[]{ 3, 5, 5, 5, 3, 1, 1, 1 } },
-        { 5, new int[]{ 3, 2, 0, 0, 1, 0, 0, 0 }, new long[]{ 3, 5, 5, 5, 6, 3, 1, 1 } },
-        { 5, new int[]{ 3, 11, 5, 13, 7, 17, 0, 3, 50, 600, 7000 },
-            new long[]{ 3, 14, 19, 32, 39, 53, 42, 40, 77, 670, 7653 } }, };
-  }
-
-  @Test(dataProvider = "simulatedCounterIterations")
-  public void testCounterWithSimulatedRuns(int windowLengthInSlots, int[] incrementsPerIteration,
-      long[] expCountsPerIteration) {
-    // given
-    SlidingWindowCounter<Object> counter = new SlidingWindowCounter<Object>(windowLengthInSlots);
-    int numIterations = incrementsPerIteration.length;
-
-    for (int i = 0; i < numIterations; i++) {
-      int numIncrements = incrementsPerIteration[i];
-      long expCounts = expCountsPerIteration[i];
-      // Objects are absent if they were zero both this iteration
-      // and the last -- if only this one, we need to report zero.
-      boolean expAbsent = ((expCounts == 0) && ((i == 0) || (expCountsPerIteration[i - 1] == 0)));
-
-      // given (for this iteration)
-      for (int j = 0; j < numIncrements; j++) {
-        counter.incrementCount(ANY_OBJECT);
-      }
-
-      // when (for this iteration)
-      Map<Object, Long> counts = counter.getCountsThenAdvanceWindow();
-
-      // then (for this iteration)
-      if (expAbsent) {
-        assertThat(counts).doesNotContainKey(ANY_OBJECT);
-      }
-      else {
-        assertThat(counts.get(ANY_OBJECT)).isEqualTo(expCounts);
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/examples/storm-starter/test/jvm/storm/starter/tools/SlotBasedCounterTest.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/SlotBasedCounterTest.java b/examples/storm-starter/test/jvm/storm/starter/tools/SlotBasedCounterTest.java
deleted file mode 100644
index 3ad042b..0000000
--- a/examples/storm-starter/test/jvm/storm/starter/tools/SlotBasedCounterTest.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.tools;
-
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import java.util.Map;
-
-import static org.fest.assertions.api.Assertions.assertThat;
-
-public class SlotBasedCounterTest {
-
-  private static final int ANY_NUM_SLOTS = 1;
-  private static final int ANY_SLOT = 0;
-  private static final Object ANY_OBJECT = "ANY_OBJECT";
-
-  @DataProvider
-  public Object[][] illegalNumSlotsData() {
-    return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
-  }
-
-  @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalNumSlotsData")
-  public void negativeOrZeroNumSlotsShouldThrowIAE(int numSlots) {
-    new SlotBasedCounter<Object>(numSlots);
-  }
-
-  @DataProvider
-  public Object[][] legalNumSlotsData() {
-    return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
-  }
-
-  @Test(dataProvider = "legalNumSlotsData")
-  public void positiveNumSlotsShouldBeOk(int numSlots) {
-    new SlotBasedCounter<Object>(numSlots);
-  }
-
-  @Test
-  public void newInstanceShouldHaveEmptyCounts() {
-    // given
-    SlotBasedCounter<Object> counter = new SlotBasedCounter<Object>(ANY_NUM_SLOTS);
-
-    // when
-    Map<Object, Long> counts = counter.getCounts();
-
-    // then
-    assertThat(counts).isEmpty();
-  }
-
-  @Test
-  public void shouldReturnNonEmptyCountsWhenAtLeastOneObjectWasCounted() {
-    // given
-    SlotBasedCounter<Object> counter = new SlotBasedCounter<Object>(ANY_NUM_SLOTS);
-    counter.incrementCount(ANY_OBJECT, ANY_SLOT);
-
-    // when
-    Map<Object, Long> counts = counter.getCounts();
-
-    // then
-    assertThat(counts).isNotEmpty();
-
-    // additional tests that go beyond what this test is primarily about
-    assertThat(counts.size()).isEqualTo(1);
-    assertThat(counts.get(ANY_OBJECT)).isEqualTo(1);
-  }
-
-  @DataProvider
-  public Object[][] incrementCountData() {
-    return new Object[][]{ { new String[]{ "foo", "bar" }, new int[]{ 3, 2 } } };
-  }
-
-  @Test(dataProvider = "incrementCountData")
-  public void shouldIncrementCount(Object[] objects, int[] expCounts) {
-    // given
-    SlotBasedCounter<Object> counter = new SlotBasedCounter<Object>(ANY_NUM_SLOTS);
-
-    // when
-    for (int i = 0; i < objects.length; i++) {
-      Object obj = objects[i];
-      int numIncrements = expCounts[i];
-      for (int j = 0; j < numIncrements; j++) {
-        counter.incrementCount(obj, ANY_SLOT);
-      }
-    }
-
-    // then
-    for (int i = 0; i < objects.length; i++) {
-      assertThat(counter.getCount(objects[i], ANY_SLOT)).isEqualTo(expCounts[i]);
-    }
-    assertThat(counter.getCount("nonexistentObject", ANY_SLOT)).isEqualTo(0);
-  }
-
-  @Test
-  public void shouldReturnZeroForNonexistentObject() {
-    // given
-    SlotBasedCounter<Object> counter = new SlotBasedCounter<Object>(ANY_NUM_SLOTS);
-
-    // when
-    counter.incrementCount("somethingElse", ANY_SLOT);
-
-    // then
-    assertThat(counter.getCount("nonexistentObject", ANY_SLOT)).isEqualTo(0);
-  }
-
-  @Test
-  public void shouldIncrementCountOnlyOneSlotAtATime() {
-    // given
-    int numSlots = 3;
-    Object obj = Long.valueOf(10);
-    SlotBasedCounter<Object> counter = new SlotBasedCounter<Object>(numSlots);
-
-    // when (empty)
-    // then
-    assertThat(counter.getCount(obj, 0)).isEqualTo(0);
-    assertThat(counter.getCount(obj, 1)).isEqualTo(0);
-    assertThat(counter.getCount(obj, 2)).isEqualTo(0);
-
-    // when
-    counter.incrementCount(obj, 1);
-
-    // then
-    assertThat(counter.getCount(obj, 0)).isEqualTo(0);
-    assertThat(counter.getCount(obj, 1)).isEqualTo(1);
-    assertThat(counter.getCount(obj, 2)).isEqualTo(0);
-  }
-
-  @Test
-  public void wipeSlotShouldSetAllCountsInSlotToZero() {
-    // given
-    SlotBasedCounter<Object> counter = new SlotBasedCounter<Object>(ANY_NUM_SLOTS);
-    Object countWasOne = "countWasOne";
-    Object countWasThree = "countWasThree";
-    counter.incrementCount(countWasOne, ANY_SLOT);
-    counter.incrementCount(countWasThree, ANY_SLOT);
-    counter.incrementCount(countWasThree, ANY_SLOT);
-    counter.incrementCount(countWasThree, ANY_SLOT);
-
-    // when
-    counter.wipeSlot(ANY_SLOT);
-
-    // then
-    assertThat(counter.getCount(countWasOne, ANY_SLOT)).isEqualTo(0);
-    assertThat(counter.getCount(countWasThree, ANY_SLOT)).isEqualTo(0);
-  }
-
-  @Test
-  public void wipeZerosShouldRemoveAnyObjectsWithZeroTotalCount() {
-    // given
-    SlotBasedCounter<Object> counter = new SlotBasedCounter<Object>(2);
-    int wipeSlot = 0;
-    int otherSlot = 1;
-    Object willBeRemoved = "willBeRemoved";
-    Object willContinueToBeTracked = "willContinueToBeTracked";
-    counter.incrementCount(willBeRemoved, wipeSlot);
-    counter.incrementCount(willContinueToBeTracked, wipeSlot);
-    counter.incrementCount(willContinueToBeTracked, otherSlot);
-
-    // when
-    counter.wipeSlot(wipeSlot);
-    counter.wipeZeros();
-
-    // then
-    assertThat(counter.getCounts()).doesNotContainKey(willBeRemoved);
-    assertThat(counter.getCounts()).containsKey(willContinueToBeTracked);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/flux/README.md
----------------------------------------------------------------------
diff --git a/external/flux/README.md b/external/flux/README.md
index c4ef145..7043689 100644
--- a/external/flux/README.md
+++ b/external/flux/README.md
@@ -236,7 +236,7 @@ sentence-spout[1](org.apache.storm.flux.spouts.GenericShellSpout)
 ---------------- BOLTS ---------------
 splitsentence[1](org.apache.storm.flux.bolts.GenericShellBolt)
 log[1](org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
-count[1](backtype.storm.testing.TestWordCounter)
+count[1](org.apache.storm.testing.TestWordCounter)
 --------------- STREAMS ---------------
 sentence-spout --SHUFFLE--> splitsentence
 splitsentence --FIELDS--> count
@@ -255,7 +255,7 @@ definition consists of the following:
       * A list of spouts, each identified by a unique ID
       * A list of bolts, each identified by a unique ID
       * A list of "stream" objects representing a flow of tuples between spouts and bolts
-  4. **OR** (A JVM class that can produce a `backtype.storm.generated.StormTopology` instance:
+  4. **OR** (A JVM class that can produce a `org.apache.storm.generated.StormTopology` instance:
       * A `topologySource` definition.
 
 
@@ -270,13 +270,13 @@ config:
 # spout definitions
 spouts:
   - id: "spout-1"
-    className: "backtype.storm.testing.TestWordSpout"
+    className: "org.apache.storm.testing.TestWordSpout"
     parallelism: 1
 
 # bolt definitions
 bolts:
   - id: "bolt-1"
-    className: "backtype.storm.testing.TestWordCounter"
+    className: "org.apache.storm.testing.TestWordCounter"
     parallelism: 1
   - id: "bolt-2"
     className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
@@ -324,7 +324,7 @@ You would then be able to reference those properties by key in your `.yaml` file
 
 ```yaml
   - id: "zkHosts"
-    className: "storm.kafka.ZkHosts"
+    className: "org.apache.storm.kafka.ZkHosts"
     constructorArgs:
       - "${kafka.zookeeper.hosts}"
 ```
@@ -344,13 +344,13 @@ Components are essentially named object instances that are made available as con
 bolts. If you are familiar with the Spring framework, components are roughly analagous to Spring beans.
 
 Every component is identified, at a minimum, by a unique identifier (String) and a class name (String). For example,
-the following will make an instance of the `storm.kafka.StringScheme` class available as a reference under the key
-`"stringScheme"` . This assumes the `storm.kafka.StringScheme` has a default constructor.
+the following will make an instance of the `org.apache.storm.kafka.StringScheme` class available as a reference under the key
+`"stringScheme"` . This assumes the `org.apache.storm.kafka.StringScheme` has a default constructor.
 
 ```yaml
 components:
   - id: "stringScheme"
-    className: "storm.kafka.StringScheme"
+    className: "org.apache.storm.kafka.StringScheme"
 ```
 
 ### Contructor Arguments, References, Properties and Configuration Methods
@@ -362,7 +362,7 @@ object by calling the constructor that takes a single string as an argument:
 
 ```yaml
   - id: "zkHosts"
-    className: "storm.kafka.ZkHosts"
+    className: "org.apache.storm.kafka.ZkHosts"
     constructorArgs:
       - "localhost:2181"
       - true
@@ -378,10 +378,10 @@ to another component's constructor:
 ```yaml
 components:
   - id: "stringScheme"
-    className: "storm.kafka.StringScheme"
+    className: "org.apache.storm.kafka.StringScheme"
 
   - id: "stringMultiScheme"
-    className: "backtype.storm.spout.SchemeAsMultiScheme"
+    className: "org.apache.storm.spout.SchemeAsMultiScheme"
     constructorArgs:
       - ref: "stringScheme" # component with id "stringScheme" must be declared above.
 ```
@@ -393,7 +393,7 @@ JavaBean-like setter methods and fields declared as `public`:
 
 ```yaml
   - id: "spoutConfig"
-    className: "storm.kafka.SpoutConfig"
+    className: "org.apache.storm.kafka.SpoutConfig"
     constructorArgs:
       # brokerHosts
       - ref: "zkHosts"
@@ -492,7 +492,7 @@ FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
 
 ## Topology Config
 The `config` section is simply a map of Storm topology configuration parameters that will be passed to the
-`backtype.storm.StormSubmitter` as an instance of the `backtype.storm.Config` class:
+`org.apache.storm.StormSubmitter` as an instance of the `org.apache.storm.Config` class:
 
 ```yaml
 config:
@@ -537,7 +537,7 @@ topologySource:
 ```
 
 __N.B.:__ The specified method must accept a single argument of type `java.util.Map<String, Object>` or
-`backtype.storm.Config`, and return a `backtype.storm.generated.StormTopology` object.
+`org.apache.storm.Config`, and return a `org.apache.storm.generated.StormTopology` object.
 
 # YAML DSL
 ## Spouts and Bolts
@@ -568,21 +568,21 @@ Kafka spout example:
 ```yaml
 components:
   - id: "stringScheme"
-    className: "storm.kafka.StringScheme"
+    className: "org.apache.storm.kafka.StringScheme"
 
   - id: "stringMultiScheme"
-    className: "backtype.storm.spout.SchemeAsMultiScheme"
+    className: "org.apache.storm.spout.SchemeAsMultiScheme"
     constructorArgs:
       - ref: "stringScheme"
 
   - id: "zkHosts"
-    className: "storm.kafka.ZkHosts"
+    className: "org.apache.storm.kafka.ZkHosts"
     constructorArgs:
       - "localhost:2181"
 
 # Alternative kafka config
 #  - id: "kafkaConfig"
-#    className: "storm.kafka.KafkaConfig"
+#    className: "org.apache.storm.kafka.KafkaConfig"
 #    constructorArgs:
 #      # brokerHosts
 #      - ref: "zkHosts"
@@ -592,7 +592,7 @@ components:
 #      - "myKafkaClientId"
 
   - id: "spoutConfig"
-    className: "storm.kafka.SpoutConfig"
+    className: "org.apache.storm.kafka.SpoutConfig"
     constructorArgs:
       # brokerHosts
       - ref: "zkHosts"
@@ -614,7 +614,7 @@ config:
 # spout definitions
 spouts:
   - id: "kafka-spout"
-    className: "storm.kafka.KafkaSpout"
+    className: "org.apache.storm.kafka.KafkaSpout"
     constructorArgs:
       - ref: "spoutConfig"
 
@@ -641,7 +641,7 @@ bolts:
     # ...
 
   - id: "count"
-    className: "backtype.storm.testing.TestWordCounter"
+    className: "org.apache.storm.testing.TestWordCounter"
     parallelism: 1
     # ...
 ```
@@ -708,7 +708,7 @@ Custom stream groupings are defined by setting the grouping type to `CUSTOM` and
 that tells Flux how to instantiate the custom class. The `customClass` definition extends `component`, so it supports
 constructor arguments, references, and properties as well.
 
-The example below creates a Stream with an instance of the `backtype.storm.testing.NGrouping` custom stream grouping
+The example below creates a Stream with an instance of the `org.apache.storm.testing.NGrouping` custom stream grouping
 class.
 
 ```yaml
@@ -718,7 +718,7 @@ class.
     grouping:
       type: CUSTOM
       customClass:
-        className: "backtype.storm.testing.NGrouping"
+        className: "org.apache.storm.testing.NGrouping"
         constructorArgs:
           - 1
 ```
@@ -786,7 +786,7 @@ bolts:
     parallelism: 1
 
   - id: "count"
-    className: "backtype.storm.testing.TestWordCounter"
+    className: "org.apache.storm.testing.TestWordCounter"
     parallelism: 1
 
 #stream definitions
@@ -835,4 +835,4 @@ topologySource:
 
 ## Committer Sponsors
 
- * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
\ No newline at end of file
+ * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
index 71c20a7..cdebd01 100644
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java
@@ -17,13 +17,13 @@
  */
 package org.apache.storm.flux;
 
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.generated.TopologyInitialStatus;
-import backtype.storm.utils.Utils;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.SubmitOptions;
+import org.apache.storm.generated.TopologyInitialStatus;
+import org.apache.storm.utils.Utils;
 import org.apache.commons.cli.*;
 import org.apache.storm.flux.model.*;
 import org.apache.storm.flux.parser.FluxParser;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
index 014116d..c16aa05 100644
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
@@ -17,12 +17,12 @@
  */
 package org.apache.storm.flux;
 
-import backtype.storm.Config;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.topology.*;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.topology.*;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
 import org.apache.storm.flux.api.TopologySource;
 import org.apache.storm.flux.model.*;
 import org.slf4j.Logger;
@@ -35,7 +35,7 @@ public class FluxBuilder {
     private static Logger LOG = LoggerFactory.getLogger(FluxBuilder.class);
 
     /**
-     * Given a topology definition, return a populated `backtype.storm.Config` instance.
+     * Given a topology definition, return a populated `org.apache.storm.Config` instance.
      *
      * @param topologyDef
      * @return
@@ -103,7 +103,7 @@ public class FluxBuilder {
 
     /**
      * Given a `java.lang.Object` instance and a method name, attempt to find a method that matches the input
-     * parameter: `java.util.Map` or `backtype.storm.Config`.
+     * parameter: `java.util.Map` or `org.apache.storm.Config`.
      *
      * @param topologySource object to inspect for the specified method
      * @param methodName name of the method to look for

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java
index fbccfb7..2777854 100644
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/api/TopologySource.java
@@ -18,7 +18,7 @@
 package org.apache.storm.flux.api;
 
 
-import backtype.storm.generated.StormTopology;
+import org.apache.storm.generated.StormTopology;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java
index e94b887..1520006 100644
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ExecutionContext.java
@@ -17,9 +17,9 @@
  */
 package org.apache.storm.flux.model;
 
-import backtype.storm.Config;
-import backtype.storm.task.IBolt;
-import backtype.storm.topology.IRichSpout;
+import org.apache.storm.Config;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.topology.IRichSpout;
 
 import java.util.HashMap;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
index dc9e6cb..bfac7dc 100644
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/ObjectDef.java
@@ -17,7 +17,7 @@
  */
 package org.apache.storm.flux.model;
 
-import backtype.storm.Config;
+import org.apache.storm.Config;
 
 import java.util.ArrayList;
 import java.util.LinkedHashMap;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java
index a6ae450..86614f1 100644
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java
@@ -27,7 +27,7 @@ import java.util.*;
  *
  * It consists of the following:
  *   1. The topology name
- *   2. A `java.util.Map` representing the `backtype.storm.config` for the topology
+ *   2. A `java.util.Map` representing the `org.apache.storm.config` for the topology
  *   3. A list of spout definitions
  *   4. A list of bolt definitions
  *   5. A list of stream definitions that define the flow between spouts and bolts.

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java b/external/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
index 91a81f1..7a5ed7a 100644
--- a/external/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
+++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
@@ -17,8 +17,8 @@
  */
 package org.apache.storm.flux;
 
-import backtype.storm.Config;
-import backtype.storm.generated.StormTopology;
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
 import org.apache.storm.flux.model.ExecutionContext;
 import org.apache.storm.flux.model.TopologyDef;
 import org.apache.storm.flux.parser.FluxParser;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopology.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopology.java b/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopology.java
index 981d6b0..8e3cda2 100644
--- a/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopology.java
+++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopology.java
@@ -17,8 +17,8 @@
  */
 package org.apache.storm.flux.test;
 
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.TopologyBuilder;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.flux.api.TopologySource;
 import org.apache.storm.flux.wrappers.bolts.LogInfoBolt;
 import org.apache.storm.flux.wrappers.spouts.FluxShellSpout;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologySource.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologySource.java b/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologySource.java
index 61eb113..2fadacf 100644
--- a/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologySource.java
+++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologySource.java
@@ -17,8 +17,8 @@
  */
 package org.apache.storm.flux.test;
 
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.TopologyBuilder;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.flux.api.TopologySource;
 import org.apache.storm.flux.wrappers.bolts.LogInfoBolt;
 import org.apache.storm.flux.wrappers.spouts.FluxShellSpout;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologyWithConfigParam.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologyWithConfigParam.java b/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologyWithConfigParam.java
index 39e2e3d..8b0aa05 100644
--- a/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologyWithConfigParam.java
+++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologyWithConfigParam.java
@@ -17,9 +17,9 @@
  */
 package org.apache.storm.flux.test;
 
-import backtype.storm.Config;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.TopologyBuilder;
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.flux.wrappers.bolts.LogInfoBolt;
 import org.apache.storm.flux.wrappers.spouts.FluxShellSpout;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/TestBolt.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/TestBolt.java b/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/TestBolt.java
index f9f28c5..c8a7b85 100644
--- a/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/TestBolt.java
+++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/TestBolt.java
@@ -17,10 +17,10 @@
  */
 package org.apache.storm.flux.test;
 
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;