You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/03/20 22:22:58 UTC

[35/50] [abbrv] git commit: AbstractRankerBolt: Emit defensive copy of Rankings object

AbstractRankerBolt: Emit defensive copy of Rankings object

This addresses ConcurrentModificationException that might be thrown in
the ranker bolts, when the Rankings object would be modified (by the
emitting bolt) after sending it downstream.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/651ccfe0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/651ccfe0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/651ccfe0

Branch: refs/heads/master
Commit: 651ccfe07b542fd0b2c8c9a518fb6c1cd173bd9d
Parents: 74f01ec
Author: Michael G. Noll <mn...@verisign.com>
Authored: Sat Dec 7 12:26:11 2013 +0100
Committer: Michael G. Noll <mn...@verisign.com>
Committed: Sat Dec 7 12:54:08 2013 +0100

----------------------------------------------------------------------
 .../storm/starter/bolt/AbstractRankerBolt.java  |  2 +-
 src/jvm/storm/starter/tools/Rankable.java       |  6 ++
 .../starter/tools/RankableObjectWithFields.java | 13 +++
 src/jvm/storm/starter/tools/Rankings.java       | 35 ++++++--
 storm-starter.iml                               | 82 ++++++++++++++++++
 .../tools/RankableObjectWithFieldsTest.java     | 21 +++++
 test/jvm/storm/starter/tools/RankingsTest.java  | 90 ++++++++++++++++++++
 7 files changed, 243 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/651ccfe0/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/bolt/AbstractRankerBolt.java b/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
index 6e2551c..07ac843 100644
--- a/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
+++ b/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
@@ -73,7 +73,7 @@ public abstract class AbstractRankerBolt extends BaseBasicBolt {
   abstract void updateRankingsWithTuple(Tuple tuple);
 
   private void emitRankings(BasicOutputCollector collector) {
-    collector.emit(new Values(rankings));
+    collector.emit(new Values(rankings.copy()));
     getLogger().debug("Rankings: " + rankings);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/651ccfe0/src/jvm/storm/starter/tools/Rankable.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/tools/Rankable.java b/src/jvm/storm/starter/tools/Rankable.java
index 36ba086..254d920 100644
--- a/src/jvm/storm/starter/tools/Rankable.java
+++ b/src/jvm/storm/starter/tools/Rankable.java
@@ -6,4 +6,10 @@ public interface Rankable extends Comparable<Rankable> {
 
   long getCount();
 
+  /**
+   * Note: We do not defensively copy the object wrapped by the Rankable.  It is passed as is.
+   *
+   * @return a defensive copy
+   */
+  Rankable copy();
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/651ccfe0/src/jvm/storm/starter/tools/RankableObjectWithFields.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/tools/RankableObjectWithFields.java b/src/jvm/storm/starter/tools/RankableObjectWithFields.java
index 3b27d0d..3079434 100644
--- a/src/jvm/storm/starter/tools/RankableObjectWithFields.java
+++ b/src/jvm/storm/starter/tools/RankableObjectWithFields.java
@@ -115,4 +115,17 @@ public class RankableObjectWithFields implements Rankable, Serializable {
     buf.append("]");
     return buf.toString();
   }
+
+  /**
+   * Note: We do not defensively copy the wrapped object and any accompanying fields.  We do guarantee, however,
+   * do return a defensive (shallow) copy of the List object that is wrapping any accompanying fields.
+   *
+   * @return
+   */
+  @Override
+  public Rankable copy() {
+    List<Object> shallowCopyOfFields = ImmutableList.copyOf(getFields());
+    return new RankableObjectWithFields(getObject(), getCount(), shallowCopyOfFields);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/651ccfe0/src/jvm/storm/starter/tools/Rankings.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/tools/Rankings.java b/src/jvm/storm/starter/tools/Rankings.java
index 5ff471a..5076bf6 100644
--- a/src/jvm/storm/starter/tools/Rankings.java
+++ b/src/jvm/storm/starter/tools/Rankings.java
@@ -1,5 +1,6 @@
 package storm.starter.tools;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 import java.io.Serializable;
@@ -26,6 +27,15 @@ public class Rankings implements Serializable {
   }
 
   /**
+   * Copy constructor.
+   * @param other
+   */
+  public Rankings(Rankings other) {
+    this(other.maxSize());
+    updateWith(other);
+  }
+
+  /**
    * @return the maximum possible number (size) of ranked objects this instance can hold
    */
   public int maxSize() {
@@ -39,12 +49,20 @@ public class Rankings implements Serializable {
     return rankedItems.size();
   }
 
+  /**
+   * The returned defensive copy is only "somewhat" defensive.  We do, for instance, return a defensive copy of the
+   * enclosing List instance, and we do try to defensively copy any contained Rankable objects, too.  However, the
+   * contract of {@link storm.starter.tools.Rankable#copy()} does not guarantee that any Object's embedded within
+   * a Rankable will be defensively copied, too.
+   *
+   * @return a somewhat defensive copy of ranked items
+   */
   public List<Rankable> getRankings() {
-    return defensiveCopyOf(rankedItems);
-  }
-
-  private List<Rankable> defensiveCopyOf(List<Rankable> list) {
-    return Lists.newArrayList(rankedItems);
+    List<Rankable> copy = Lists.newLinkedList();
+    for (Rankable r: rankedItems) {
+      copy.add(r.copy());
+    }
+    return ImmutableList.copyOf(copy);
   }
 
   public void updateWith(Rankings other) {
@@ -111,4 +129,11 @@ public class Rankings implements Serializable {
   public String toString() {
     return rankedItems.toString();
   }
+
+  /**
+   * Creates a (defensive) copy of itself.
+   */
+  public Rankings copy() {
+    return new Rankings(this);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/651ccfe0/storm-starter.iml
----------------------------------------------------------------------
diff --git a/storm-starter.iml b/storm-starter.iml
new file mode 100644
index 0000000..a4e5116
--- /dev/null
+++ b/storm-starter.iml
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
+  <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_6" inherit-compiler-output="false">
+    <output url="file://$MODULE_DIR$/target/classes" />
+    <output-test url="file://$MODULE_DIR$/target/test-classes" />
+    <exclude-output />
+    <content url="file://$MODULE_DIR$">
+      <sourceFolder url="file://$MODULE_DIR$/src/jvm" isTestSource="false" />
+      <sourceFolder url="file://$MODULE_DIR$/multilang" isTestSource="false" />
+      <sourceFolder url="file://$MODULE_DIR$/test/jvm" isTestSource="true" />
+      <excludeFolder url="file://$MODULE_DIR$/target" />
+    </content>
+    <orderEntry type="inheritedJdk" />
+    <orderEntry type="sourceFolder" forTests="false" />
+    <orderEntry type="library" scope="TEST" name="Maven: junit:junit:3.8.1" level="project" />
+    <orderEntry type="library" scope="TEST" name="Maven: org.testng:testng:6.8.5" level="project" />
+    <orderEntry type="library" scope="TEST" name="Maven: org.beanshell:bsh:2.0b4" level="project" />
+    <orderEntry type="library" scope="TEST" name="Maven: com.beust:jcommander:1.27" level="project" />
+    <orderEntry type="library" scope="TEST" name="Maven: org.yaml:snakeyaml:1.6" level="project" />
+    <orderEntry type="library" scope="TEST" name="Maven: org.mockito:mockito-all:1.9.0" level="project" />
+    <orderEntry type="library" scope="TEST" name="Maven: org.easytesting:fest-assert-core:2.0M8" level="project" />
+    <orderEntry type="library" scope="TEST" name="Maven: org.easytesting:fest-util:1.2.3" level="project" />
+    <orderEntry type="library" scope="TEST" name="Maven: org.jmock:jmock:2.6.0" level="project" />
+    <orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.1" level="project" />
+    <orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-library:1.1" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: storm:storm:0.9.0.1" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: storm:storm-console-logging:0.9.0.1" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: storm:storm-core:0.9.0.1" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: org.clojure:clojure:1.4.0" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: commons-io:commons-io:1.4" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.commons:commons-exec:1.1" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: storm:libthrift7:0.7.0-2" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: commons-lang:commons-lang:2.5" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: javax.servlet:servlet-api:2.5" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.httpcomponents:httpclient:4.1.1" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.httpcomponents:httpcore:4.1" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: commons-logging:commons-logging:1.1.1" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: commons-codec:commons-codec:1.4" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: clj-time:clj-time:0.4.1" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: joda-time:joda-time:2.0" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: com.netflix.curator:curator-framework:1.0.1" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: com.netflix.curator:curator-client:1.0.1" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: org.slf4j:slf4j-api:1.6.5" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: org.apache.zookeeper:zookeeper:3.3.3" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: jline:jline:0.9.94" level="project" />
+    <orderEntry type="library" name="Maven: com.google.guava:guava:15.0" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: backtype:jzmq:2.1.0" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: com.googlecode.json-simple:json-simple:1.1" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: compojure:compojure:1.1.3" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: org.clojure:core.incubator:0.1.0" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: org.clojure:tools.macro:0.1.0" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: clout:clout:1.0.1" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: ring:ring-core:1.1.5" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: commons-fileupload:commons-fileupload:1.2.1" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: hiccup:hiccup:0.3.6" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: ring:ring-devel:0.3.11" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: clj-stacktrace:clj-stacktrace:0.2.2" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: ring:ring-jetty-adapter:0.3.11" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: ring:ring-servlet:0.3.11" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: org.mortbay.jetty:jetty:6.1.26" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: org.mortbay.jetty:jetty-util:6.1.26" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: org.mortbay.jetty:servlet-api:2.5-20081211" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: org.clojure:tools.logging:0.2.3" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: org.clojure:math.numeric-tower:0.0.1" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: storm:carbonite:1.5.0" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: com.esotericsoftware.kryo:kryo:2.17" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: com.esotericsoftware.reflectasm:reflectasm:shaded:1.07" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: org.ow2.asm:asm:4.0" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: com.esotericsoftware.minlog:minlog:1.2" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: org.objenesis:objenesis:1.2" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: storm:tools.cli:0.2.2" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: com.googlecode.disruptor:disruptor:2.10.1" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: storm:jgrapht:0.8.3" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: ch.qos.logback:logback-classic:1.0.6" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: ch.qos.logback:logback-core:1.0.6" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: org.slf4j:log4j-over-slf4j:1.6.6" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: storm:storm-netty:0.9.0.1" level="project" />
+    <orderEntry type="library" scope="PROVIDED" name="Maven: io.netty:netty:3.6.3.Final" level="project" />
+    <orderEntry type="library" name="Maven: commons-collections:commons-collections:3.2.1" level="project" />
+  </component>
+</module>
+

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/651ccfe0/test/jvm/storm/starter/tools/RankableObjectWithFieldsTest.java
----------------------------------------------------------------------
diff --git a/test/jvm/storm/starter/tools/RankableObjectWithFieldsTest.java b/test/jvm/storm/starter/tools/RankableObjectWithFieldsTest.java
index c8a14cc..82a89e6 100644
--- a/test/jvm/storm/starter/tools/RankableObjectWithFieldsTest.java
+++ b/test/jvm/storm/starter/tools/RankableObjectWithFieldsTest.java
@@ -211,4 +211,25 @@ public class RankableObjectWithFieldsTest {
     assertThat(r.getFields()).isEqualTo(fields);
 
   }
+
+  @DataProvider
+  public Object[][] copyData() {
+    return new Object[][]{ { new RankableObjectWithFields("foo", 0) }, { new RankableObjectWithFields("foo", 3,
+        "someOtherField") }, { new RankableObjectWithFields("foo", 0, "someField") } };
+  }
+
+  // TODO: What would be a good test to ensure that RankableObjectWithFields is at least somewhat defensively copied?
+  //       The contract of Rankable#copy() returns a Rankable value, not a RankableObjectWithFields.
+  @Test(dataProvider = "copyData")
+  public void copyShouldReturnCopy(RankableObjectWithFields original) {
+    // given
+
+    // when
+    Rankable copy = original.copy();
+
+    // then
+    assertThat(copy.getObject()).isEqualTo(original.getObject());
+    assertThat(copy.getCount()).isEqualTo(original.getCount());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/651ccfe0/test/jvm/storm/starter/tools/RankingsTest.java
----------------------------------------------------------------------
diff --git a/test/jvm/storm/starter/tools/RankingsTest.java b/test/jvm/storm/starter/tools/RankingsTest.java
index 7d3b4da..e1f8290 100644
--- a/test/jvm/storm/starter/tools/RankingsTest.java
+++ b/test/jvm/storm/starter/tools/RankingsTest.java
@@ -36,6 +36,57 @@ public class RankingsTest {
   }
 
   @DataProvider
+  public Object[][] copyRankingsData() {
+    return new Object[][]{ { 5, Lists.newArrayList(A, B, C) }, { 2, Lists.newArrayList(A, B, C, D) },
+        { 1, Lists.newArrayList() }, { 1, Lists.newArrayList(A) }, { 1, Lists.newArrayList(A, B) } };
+  }
+
+  @Test(dataProvider = "copyRankingsData")
+  public void copyConstructorShouldReturnCopy(int topN, List<Rankable> rankables) {
+    // given
+    Rankings rankings = new Rankings(topN);
+    for (Rankable r : rankables) {
+      rankings.updateWith(r);
+    }
+
+    // when
+    Rankings copy = new Rankings(rankings);
+
+    // then
+    assertThat(copy.maxSize()).isEqualTo(rankings.maxSize());
+    assertThat(copy.getRankings()).isEqualTo(rankings.getRankings());
+  }
+
+  @DataProvider
+  public Object[][] defensiveCopyRankingsData() {
+    return new Object[][]{ { 5, Lists.newArrayList(A, B, C), Lists.newArrayList(D) }, { 2, Lists.newArrayList(A, B, C,
+        D), Lists.newArrayList(E, F) }, { 1, Lists.newArrayList(), Lists.newArrayList(A) }, { 1, Lists.newArrayList(A),
+        Lists.newArrayList(B) }, { 1, Lists.newArrayList(ZERO), Lists.newArrayList(B) }, { 1, Lists.newArrayList(ZERO),
+        Lists.newArrayList() } };
+  }
+
+  @Test(dataProvider = "defensiveCopyRankingsData")
+  public void copyConstructorShouldReturnDefensiveCopy(int topN, List<Rankable> rankables, List<Rankable> changes) {
+    // given
+    Rankings original = new Rankings(topN);
+    for (Rankable r : rankables) {
+      original.updateWith(r);
+    }
+    int expSize = original.size();
+    List<Rankable> expRankings = original.getRankings();
+
+    // when
+    Rankings copy = new Rankings(original);
+    for (Rankable r : changes) {
+      copy.updateWith(r);
+    }
+
+    // then
+    assertThat(original.size()).isEqualTo(expSize);
+    assertThat(original.getRankings()).isEqualTo(expRankings);
+  }
+
+  @DataProvider
   public Object[][] legalTopNData() {
     return new Object[][]{ { 1 }, { 2 }, { 1000 }, { 1000000 } };
   }
@@ -258,4 +309,43 @@ public class RankingsTest {
     }
     assertThat(exceptions).isEmpty();
   }
+
+  @Test(dataProvider = "copyRankingsData")
+  public void copyShouldReturnCopy(int topN, List<Rankable> rankables) {
+    // given
+    Rankings rankings = new Rankings(topN);
+    for (Rankable r : rankables) {
+      rankings.updateWith(r);
+    }
+
+    // when
+    Rankings copy = rankings.copy();
+
+    // then
+    assertThat(copy.maxSize()).isEqualTo(rankings.maxSize());
+    assertThat(copy.getRankings()).isEqualTo(rankings.getRankings());
+  }
+
+  @Test(dataProvider = "defensiveCopyRankingsData")
+  public void copyShouldReturnDefensiveCopy(int topN, List<Rankable> rankables, List<Rankable> changes) {
+    // given
+    Rankings original = new Rankings(topN);
+    for (Rankable r : rankables) {
+      original.updateWith(r);
+    }
+    int expSize = original.size();
+    List<Rankable> expRankings = original.getRankings();
+
+    // when
+    Rankings copy = original.copy();
+    for (Rankable r : changes) {
+      copy.updateWith(r);
+    }
+    copy.pruneZeroCounts();
+
+    // then
+    assertThat(original.size()).isEqualTo(expSize);
+    assertThat(original.getRankings()).isEqualTo(expRankings);
+  }
+
 }
\ No newline at end of file