You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/01/29 15:34:10 UTC

[1/4] storm git commit: Java implementation of partial key grouping + test

Repository: storm
Updated Branches:
  refs/heads/master 82caf6680 -> a115c9df8


Java implementation of partial key grouping + test


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/095f56ee
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/095f56ee
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/095f56ee

Branch: refs/heads/master
Commit: 095f56eed6f9ea682f74bd6fd0d09e529cdc8490
Parents: 01e75ec
Author: Gianmarco De Francisci Morales <gd...@apache.org>
Authored: Fri Jan 23 15:36:08 2015 +0100
Committer: Gianmarco De Francisci Morales <gd...@apache.org>
Committed: Fri Jan 23 16:04:20 2015 +0100

----------------------------------------------------------------------
 storm-core/pom.xml                              |  2 +-
 .../storm/grouping/PartialKeyGrouping.java      | 40 ++++++++++++++++++++
 .../storm/grouping/PartialKeyGroupingTest.java  | 29 ++++++++++++++
 3 files changed, 70 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/095f56ee/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 9b173f2..9f3ed49 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -282,7 +282,7 @@
         <dependency>
         	<groupId>junit</groupId>
         	<artifactId>junit</artifactId>
-        	<version>4.1</version>
+        	<version>4.11</version>
         	<scope>test</scope>
         </dependency>
     </dependencies>

http://git-wip-us.apache.org/repos/asf/storm/blob/095f56ee/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java b/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java
new file mode 100644
index 0000000..8d55c21
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java
@@ -0,0 +1,40 @@
+package backtype.storm.grouping;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.task.WorkerTopologyContext;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {
+    private static final long serialVersionUID = -447379837314000353L;
+    private List<Integer> targetTasks;
+    private long[] targetTaskStats;
+    private HashFunction h1 = Hashing.murmur3_128(13);
+    private HashFunction h2 = Hashing.murmur3_128(17);
+
+    @Override
+    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+        this.targetTasks = targetTasks;
+        targetTaskStats = new long[this.targetTasks.size()];
+    }
+
+    @Override
+    public List<Integer> chooseTasks(int taskId, List<Object> values) {
+        List<Integer> boltIds = new ArrayList<Integer>(1);
+        if (values.size() > 0) {
+            String str = values.get(0).toString(); // assume key is the first field
+            int firstChoice = (int) Math.abs(h1.hashBytes(str.getBytes()).asLong()) % this.targetTasks.size();
+            int secondChoice = (int) Math.abs(h2.hashBytes(str.getBytes()).asLong()) % this.targetTasks.size();
+            int selected = targetTaskStats[firstChoice] > targetTaskStats[secondChoice] ? secondChoice : firstChoice;
+            boltIds.add(targetTasks.get(selected));
+            targetTaskStats[selected]++;
+        }
+        return boltIds;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/095f56ee/storm-core/test/jvm/backtype/storm/grouping/PartialKeyGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/grouping/PartialKeyGroupingTest.java b/storm-core/test/jvm/backtype/storm/grouping/PartialKeyGroupingTest.java
new file mode 100644
index 0000000..56ad60b
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/grouping/PartialKeyGroupingTest.java
@@ -0,0 +1,29 @@
+package backtype.storm.grouping;
+
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.assertThat;
+
+import java.util.List;
+
+import org.junit.Test;
+
+import backtype.storm.tuple.Values;
+
+import com.google.common.collect.Lists;
+
+public class PartialKeyGroupingTest {
+
+    @Test
+    public void testChooseTasks() {
+        PartialKeyGrouping pkg = new PartialKeyGrouping();
+        pkg.prepare(null, null, Lists.newArrayList(0, 1, 2, 3, 4, 5));
+        Values message = new Values("key1");
+        List<Integer> choice1 = pkg.chooseTasks(0, message);
+        assertThat(choice1.size(), is(1));
+        List<Integer> choice2 = pkg.chooseTasks(0, message);
+        assertThat(choice2, is(not(choice1)));
+        List<Integer> choice3 = pkg.chooseTasks(0, message);
+        assertThat(choice3, is(not(choice2)));
+        assertThat(choice3, is(choice1));
+    }
+}


[4/4] storm git commit: Updated README and Changelog with STORM-632

Posted by bo...@apache.org.
Updated README and Changelog with STORM-632


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a115c9df
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a115c9df
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a115c9df

Branch: refs/heads/master
Commit: a115c9df803543f6c2d378c747ae6a58ab4239ea
Parents: 6136a15
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Jan 29 08:28:04 2015 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Jan 29 08:28:04 2015 -0600

----------------------------------------------------------------------
 CHANGELOG.md    | 1 +
 README.markdown | 1 +
 2 files changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a115c9df/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ae92e74..d6c886d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -36,6 +36,7 @@
  * STORM-243: Record version and revision information in builds
  * STORM-630: Support for Clojure 1.6.0
  * STORM-629: Place Link to Source Code Repository on Webpage
+ * STORM-632: New grouping for better load balancing
 
 ## 0.9.3-rc2
  * STORM-558: change "swap!" to "reset!" to fix assignment-versions in supervisor

http://git-wip-us.apache.org/repos/asf/storm/blob/a115c9df/README.markdown
----------------------------------------------------------------------
diff --git a/README.markdown b/README.markdown
index c586575..7c6c26c 100644
--- a/README.markdown
+++ b/README.markdown
@@ -185,6 +185,7 @@ under the License.
 * Mansheng Yang ([@lightyang](https://github.com/lightyang))
 * Rick Kilgore ([@rick-kilgore](http://github.com/rick-kilgore))
 * 周向涛 ([@taojoe](https://github.com/taojoe))
+* Gianmarco De Francisci Morales ([@gdfm](https://github.com/gdfm))
 
 ## Acknowledgements
 


[3/4] storm git commit: Merge branch 'STORM-632' of https://github.com/gdfm/storm into STORM-632

Posted by bo...@apache.org.
Merge branch 'STORM-632' of https://github.com/gdfm/storm into STORM-632

STORM-632: New grouping for better load balancing


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6136a158
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6136a158
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6136a158

Branch: refs/heads/master
Commit: 6136a1586f6316d6e6c85a38e299c761fe59dda3
Parents: 82caf66 b0b6c5a
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Jan 29 08:26:54 2015 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Jan 29 08:26:54 2015 -0600

----------------------------------------------------------------------
 storm-core/pom.xml                              |  2 +-
 .../storm/grouping/PartialKeyGrouping.java      | 57 ++++++++++++++++++++
 .../storm/grouping/PartialKeyGroupingTest.java  | 46 ++++++++++++++++
 3 files changed, 104 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[2/4] storm git commit: add ASL headers

Posted by bo...@apache.org.
add ASL headers


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b0b6c5a7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b0b6c5a7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b0b6c5a7

Branch: refs/heads/master
Commit: b0b6c5a7ba04eeb6ffc2496770d169e11de7db92
Parents: 095f56e
Author: Gianmarco De Francisci Morales <gd...@apache.org>
Authored: Fri Jan 23 16:43:13 2015 +0100
Committer: Gianmarco De Francisci Morales <gd...@apache.org>
Committed: Fri Jan 23 16:43:13 2015 +0100

----------------------------------------------------------------------
 .../storm/grouping/PartialKeyGrouping.java         | 17 +++++++++++++++++
 .../storm/grouping/PartialKeyGroupingTest.java     | 17 +++++++++++++++++
 2 files changed, 34 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b0b6c5a7/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java b/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java
index 8d55c21..f36f4c6 100644
--- a/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java
+++ b/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package backtype.storm.grouping;
 
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/storm/blob/b0b6c5a7/storm-core/test/jvm/backtype/storm/grouping/PartialKeyGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/grouping/PartialKeyGroupingTest.java b/storm-core/test/jvm/backtype/storm/grouping/PartialKeyGroupingTest.java
index 56ad60b..ad43869 100644
--- a/storm-core/test/jvm/backtype/storm/grouping/PartialKeyGroupingTest.java
+++ b/storm-core/test/jvm/backtype/storm/grouping/PartialKeyGroupingTest.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package backtype.storm.grouping;
 
 import static org.hamcrest.CoreMatchers.*;