You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/02/09 19:33:07 UTC

[1/4] storm git commit: STORM-637: Integrate PartialKeyGrouping into storm API

Repository: storm
Updated Branches:
  refs/heads/master 15f99fd0d -> 8036109f6


STORM-637: Integrate PartialKeyGrouping into storm API


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dbefc0da
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dbefc0da
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dbefc0da

Branch: refs/heads/master
Commit: dbefc0da83cc7587766b39b215c7f2a21a5ef573
Parents: a115c9d
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Jan 30 15:40:42 2015 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Jan 30 15:47:11 2015 -0600

----------------------------------------------------------------------
 docs/documentation/Common-patterns.md           |  14 +-
 docs/documentation/Concepts.md                  |  13 +-
 .../storm/starter/SkewedRollingTopWords.java    | 134 +++++++++++++++++++
 .../storm/starter/bolt/RollingCountAggBolt.java |  78 +++++++++++
 .../coordination/BatchSubtopologyBuilder.java   |  11 ++
 .../storm/drpc/LinearDRPCInputDeclarer.java     |   5 +-
 .../storm/drpc/LinearDRPCTopologyBuilder.java   |  13 +-
 .../storm/grouping/PartialKeyGrouping.java      |  31 ++++-
 .../backtype/storm/topology/InputDeclarer.java  |   3 +
 .../storm/topology/TopologyBuilder.java         |  11 ++
 .../TransactionalTopologyBuilder.java           |  13 +-
 .../topology/TridentTopologyBuilder.java        |  13 +-
 .../storm/grouping/PartialKeyGroupingTest.java  |  26 +++-
 13 files changed, 348 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/dbefc0da/docs/documentation/Common-patterns.md
----------------------------------------------------------------------
diff --git a/docs/documentation/Common-patterns.md b/docs/documentation/Common-patterns.md
index 3d274f6..c11f5e9 100644
--- a/docs/documentation/Common-patterns.md
+++ b/docs/documentation/Common-patterns.md
@@ -64,7 +64,7 @@ A common continuous computation done on Storm is a "streaming top N" of some sor
 This approach obviously doesn't scale to large streams since the entire stream has to go through one task. A better way to do the computation is to do many top N's in parallel across partitions of the stream, and then merge those top N's together to get the global top N. The pattern looks like this:
 
 ```java
-builder.setBolt("rank", new RankObjects(), parallellism)
+builder.setBolt("rank", new RankObjects(), parallelism)
   .fieldsGrouping("objects", new Fields("value"));
 builder.setBolt("merge", new MergeObjects())
   .globalGrouping("rank");
@@ -72,6 +72,18 @@ builder.setBolt("merge", new MergeObjects())
 
 This pattern works because of the fields grouping done by the first bolt which gives the partitioning you need for this to be semantically correct. You can see an example of this pattern in storm-starter [here](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/RollingTopWords.java).
 
+If however you have a known skew in the data being processed it can be advantageous to use partialKeyGrouping instead of fieldsGrouping.  This will distribute the load for each key between two downstream bolts instead of a single one.
+
+```java
+builder.setBolt("count", new CountObjects(), parallelism)
+  .partialKeyGrouping("objects", new Fields("value"));
+builder.setBolt("rank" new AggregateCountsAndRank(), parallelism)
+  .fieldsGrouping("count", new Fields("key"))
+builder.setBolt("merge", new MergeRanksObjects())
+  .globalGrouping("rank");
+``` 
+
+The topology needs an extra layer of processing to aggregate the partial counts from the upstream bolts but this only processes aggregated values now so the bolt it is not subject to the load caused by the skewed data. You can see an example of this pattern in storm-starter [here](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/SkewedRollingTopWords.java).
 
 ### TimeCacheMap for efficiently keeping a cache of things that have been recently updated
 

http://git-wip-us.apache.org/repos/asf/storm/blob/dbefc0da/docs/documentation/Concepts.md
----------------------------------------------------------------------
diff --git a/docs/documentation/Concepts.md b/docs/documentation/Concepts.md
index 7ce15da..827bb3a 100644
--- a/docs/documentation/Concepts.md
+++ b/docs/documentation/Concepts.md
@@ -84,11 +84,12 @@ There are seven built-in stream groupings in Storm, and you can implement a cust
 
 1. **Shuffle grouping**: Tuples are randomly distributed across the bolt's tasks in a way such that each bolt is guaranteed to get an equal number of tuples.
 2. **Fields grouping**: The stream is partitioned by the fields specified in the grouping. For example, if the stream is grouped by the "user-id" field, tuples with the same "user-id" will always go to the same task, but tuples with different "user-id"'s may go to different tasks.
-3. **All grouping**: The stream is replicated across all the bolt's tasks. Use this grouping with care.
-4. **Global grouping**: The entire stream goes to a single one of the bolt's tasks. Specifically, it goes to the task with the lowest id.
-5. **None grouping**: This grouping specifies that you don't care how the stream is grouped. Currently, none groupings are equivalent to shuffle groupings. Eventually though, Storm will push down bolts with none groupings to execute in the same thread as the bolt or spout they subscribe from (when possible).
-6. **Direct grouping**: This is a special kind of grouping. A stream grouped this way means that the __producer__ of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the [emitDirect](/apidocs/backtype/storm/task/OutputCollector.html#emitDirect(int, int, java.util.List) methods. A bolt can get the task ids of its consumers by either using the provided [TopologyContext](/apidocs/backtype/storm/task/TopologyContext.html) or by keeping track of the output of the `emit` method in [OutputCollector](/apidocs/backtype/storm/task/OutputCollector.html) (which returns the task ids that the tuple was sent to).  
-7. **Local or shuffle grouping**: If the target bolt has one or more tasks in the same worker process, tuples will be shuffled to just those in-process tasks. Otherwise, this acts like a normal shuffle grouping.
+3. **Partial Key grouping**: The stream is partitioned by the fields specified in the grouping, like the Fields grouping, but are load balanced between two downstream bolts, which provides better utilization of resources when the incoming data is skewed. [This paper](https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf) provides a good explanation of how it works and the advantages it provides.
+4. **All grouping**: The stream is replicated across all the bolt's tasks. Use this grouping with care.
+5. **Global grouping**: The entire stream goes to a single one of the bolt's tasks. Specifically, it goes to the task with the lowest id.
+6. **None grouping**: This grouping specifies that you don't care how the stream is grouped. Currently, none groupings are equivalent to shuffle groupings. Eventually though, Storm will push down bolts with none groupings to execute in the same thread as the bolt or spout they subscribe from (when possible).
+7. **Direct grouping**: This is a special kind of grouping. A stream grouped this way means that the __producer__ of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the [emitDirect](/apidocs/backtype/storm/task/OutputCollector.html#emitDirect(int, int, java.util.List) methods. A bolt can get the task ids of its consumers by either using the provided [TopologyContext](/apidocs/backtype/storm/task/TopologyContext.html) or by keeping track of the output of the `emit` method in [OutputCollector](/apidocs/backtype/storm/task/OutputCollector.html) (which returns the task ids that the tuple was sent to).  
+8. **Local or shuffle grouping**: If the target bolt has one or more tasks in the same worker process, tuples will be shuffled to just those in-process tasks. Otherwise, this acts like a normal shuffle grouping.
 
 **Resources:**
 
@@ -114,4 +115,4 @@ Topologies execute across one or more worker processes. Each worker process is a
 
 **Resources:**
 
-* [Config.TOPOLOGY_WORKERS](/apidocs/backtype/storm/Config.html#TOPOLOGY_WORKERS): this config sets the number of workers to allocate for executing the topology
\ No newline at end of file
+* [Config.TOPOLOGY_WORKERS](/apidocs/backtype/storm/Config.html#TOPOLOGY_WORKERS): this config sets the number of workers to allocate for executing the topology

http://git-wip-us.apache.org/repos/asf/storm/blob/dbefc0da/examples/storm-starter/src/jvm/storm/starter/SkewedRollingTopWords.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/SkewedRollingTopWords.java b/examples/storm-starter/src/jvm/storm/starter/SkewedRollingTopWords.java
new file mode 100644
index 0000000..0ad8d60
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/SkewedRollingTopWords.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.starter;
+
+import backtype.storm.Config;
+import backtype.storm.testing.TestWordSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import org.apache.log4j.Logger;
+import storm.starter.bolt.IntermediateRankingsBolt;
+import storm.starter.bolt.RollingCountBolt;
+import storm.starter.bolt.RollingCountAggBolt;
+import storm.starter.bolt.TotalRankingsBolt;
+import storm.starter.util.StormRunner;
+
+/**
+ * This topology does a continuous computation of the top N words that the topology has seen in terms of cardinality.
+ * The top N computation is done in a completely scalable way, and a similar approach could be used to compute things
+ * like trending topics or trending images on Twitter. It takes an approach that assumes that some works will be much
+ * more common then other words, and uses partialKeyGrouping to better balance the skewed load.
+ */
+public class SkewedRollingTopWords {
+  private static final Logger LOG = Logger.getLogger(SkewedRollingTopWords.class);
+  private static final int DEFAULT_RUNTIME_IN_SECONDS = 60;
+  private static final int TOP_N = 5;
+
+  private final TopologyBuilder builder;
+  private final String topologyName;
+  private final Config topologyConfig;
+  private final int runtimeInSeconds;
+
+  public SkewedRollingTopWords(String topologyName) throws InterruptedException {
+    builder = new TopologyBuilder();
+    this.topologyName = topologyName;
+    topologyConfig = createTopologyConfiguration();
+    runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS;
+
+    wireTopology();
+  }
+
+  private static Config createTopologyConfiguration() {
+    Config conf = new Config();
+    conf.setDebug(true);
+    return conf;
+  }
+
+  private void wireTopology() throws InterruptedException {
+    String spoutId = "wordGenerator";
+    String counterId = "counter";
+    String aggId = "aggregator";
+    String intermediateRankerId = "intermediateRanker";
+    String totalRankerId = "finalRanker";
+    builder.setSpout(spoutId, new TestWordSpout(), 5);
+    builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).partialKeyGrouping(spoutId, new Fields("word"));
+    builder.setBolt(aggId, new RollingCountAggBolt(), 4).fieldsGrouping(counterId, new Fields("obj"));
+    builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(aggId, new Fields("obj"));
+    builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
+  }
+
+  public void runLocally() throws InterruptedException {
+    StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig, runtimeInSeconds);
+  }
+
+  public void runRemotely() throws Exception {
+    StormRunner.runTopologyRemotely(builder.createTopology(), topologyName, topologyConfig);
+  }
+
+  /**
+   * Submits (runs) the topology.
+   *
+   * Usage: "RollingTopWords [topology-name] [local|remote]"
+   *
+   * By default, the topology is run locally under the name "slidingWindowCounts".
+   *
+   * Examples:
+   *
+   * <pre>
+   * {@code
+   *
+   * # Runs in local mode (LocalCluster), with topology name "slidingWindowCounts"
+   * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords
+   *
+   * # Runs in local mode (LocalCluster), with topology name "foobar"
+   * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords foobar
+   *
+   * # Runs in local mode (LocalCluster), with topology name "foobar"
+   * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords foobar local
+   *
+   * # Runs in remote/cluster mode, with topology name "production-topology"
+   * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords production-topology remote
+   * }
+   * </pre>
+   *
+   * @param args First positional argument (optional) is topology name, second positional argument (optional) defines
+   *             whether to run the topology locally ("local") or remotely, i.e. on a real cluster ("remote").
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    String topologyName = "slidingWindowCounts";
+    if (args.length >= 1) {
+      topologyName = args[0];
+    }
+    boolean runLocally = true;
+    if (args.length >= 2 && args[1].equalsIgnoreCase("remote")) {
+      runLocally = false;
+    }
+
+    LOG.info("Topology name: " + topologyName);
+    SkewedRollingTopWords rtw = new SkewedRollingTopWords(topologyName);
+    if (runLocally) {
+      LOG.info("Running in local mode");
+      rtw.runLocally();
+    }
+    else {
+      LOG.info("Running in remote (cluster) mode");
+      rtw.runRemotely();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/dbefc0da/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
new file mode 100644
index 0000000..e513b09
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.starter.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.log4j.Logger;
+import storm.starter.tools.NthLastModifiedTimeTracker;
+import storm.starter.tools.SlidingWindowCounter;
+import storm.starter.util.TupleHelpers;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * This bolt aggregates counts from multiple upstream bolts.
+ */
+public class RollingCountAggBolt extends BaseRichBolt {
+  private static final long serialVersionUID = 5537727428628598519L;
+  private static final Logger LOG = Logger.getLogger(RollingCountAggBolt.class);
+  //Mapping of key->upstreamBolt->count
+  private Map<Object, Map<Integer, Long>> counts = new HashMap<Object, Map<Integer, Long>>();
+  private OutputCollector collector;
+
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+    this.collector = collector;
+  }
+
+  @Override
+  public void execute(Tuple tuple) {
+    Object obj = tuple.getValue(0);
+    long count = tuple.getLong(1);
+    int source = tuple.getSourceTask();
+    Map<Integer, Long> subCounts = counts.get(obj);
+    if (subCounts == null) {
+      subCounts = new HashMap<Integer, Long>();
+      counts.put(obj, subCounts);
+    }
+    //Update the current count for this object
+    subCounts.put(source, count);
+    //Output the sum of all the known counts so for this key
+    long sum = 0;
+    for (Long val: subCounts.values()) {
+      sum += val;
+    }
+    collector.emit(new Values(obj, sum));
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("obj", "count"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/dbefc0da/storm-core/src/jvm/backtype/storm/coordination/BatchSubtopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/coordination/BatchSubtopologyBuilder.java b/storm-core/src/jvm/backtype/storm/coordination/BatchSubtopologyBuilder.java
index 32258ed..f94c284 100644
--- a/storm-core/src/jvm/backtype/storm/coordination/BatchSubtopologyBuilder.java
+++ b/storm-core/src/jvm/backtype/storm/coordination/BatchSubtopologyBuilder.java
@@ -22,6 +22,7 @@ import backtype.storm.coordination.CoordinatedBolt.SourceArgs;
 import backtype.storm.generated.GlobalStreamId;
 import backtype.storm.generated.Grouping;
 import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.grouping.PartialKeyGrouping;
 import backtype.storm.topology.BaseConfigurationDeclarer;
 import backtype.storm.topology.BasicBoltExecutor;
 import backtype.storm.topology.BoltDeclarer;
@@ -374,6 +375,16 @@ public class BatchSubtopologyBuilder {
             });
             return this;
         }
+
+        @Override
+        public BoltDeclarer partialKeyGrouping(String componentId, Fields fields) {
+            return customGrouping(componentId, new PartialKeyGrouping(fields));
+        }
+
+        @Override
+        public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) {
+            return customGrouping(componentId, streamId, new PartialKeyGrouping(fields));
+        }
         
         @Override
         public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) {

http://git-wip-us.apache.org/repos/asf/storm/blob/dbefc0da/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCInputDeclarer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCInputDeclarer.java b/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCInputDeclarer.java
index eeafc99..d03075e 100644
--- a/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCInputDeclarer.java
+++ b/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCInputDeclarer.java
@@ -42,7 +42,10 @@ public interface LinearDRPCInputDeclarer extends ComponentConfigurationDeclarer<
 
     public LinearDRPCInputDeclarer directGrouping();
     public LinearDRPCInputDeclarer directGrouping(String streamId);
-    
+
+    public LinearDRPCInputDeclarer partialKeyGrouping(Fields fields);
+    public LinearDRPCInputDeclarer partialKeyGrouping(String streamId, Fields fields);
+
     public LinearDRPCInputDeclarer customGrouping(CustomStreamGrouping grouping);
     public LinearDRPCInputDeclarer customGrouping(String streamId, CustomStreamGrouping grouping);
     

http://git-wip-us.apache.org/repos/asf/storm/blob/dbefc0da/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java b/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java
index 75d75f3..a171a2c 100644
--- a/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java
+++ b/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java
@@ -28,6 +28,7 @@ import backtype.storm.coordination.IBatchBolt;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.generated.StreamInfo;
 import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.grouping.PartialKeyGrouping;
 import backtype.storm.topology.BaseConfigurationDeclarer;
 import backtype.storm.topology.BasicBoltExecutor;
 import backtype.storm.topology.BoltDeclarer;
@@ -347,7 +348,17 @@ public class LinearDRPCTopologyBuilder {
             });
             return this;
         }
-        
+
+        @Override
+        public LinearDRPCInputDeclarer partialKeyGrouping(Fields fields) {
+            return customGrouping(new PartialKeyGrouping(fields));
+        }
+
+        @Override
+        public LinearDRPCInputDeclarer partialKeyGrouping(String streamId, Fields fields) {
+            return customGrouping(streamId, new PartialKeyGrouping(fields));
+        }
+
         @Override
         public LinearDRPCInputDeclarer customGrouping(final CustomStreamGrouping grouping) {
             addDeclaration(new InputDeclaration() {

http://git-wip-us.apache.org/repos/asf/storm/blob/dbefc0da/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java b/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java
index f36f4c6..d1f534b 100644
--- a/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java
+++ b/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java
@@ -18,12 +18,14 @@
 package backtype.storm.grouping;
 
 import java.io.Serializable;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
 import backtype.storm.generated.GlobalStreamId;
 import backtype.storm.grouping.CustomStreamGrouping;
 import backtype.storm.task.WorkerTopologyContext;
+import backtype.storm.tuple.Fields;
 
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;
@@ -34,20 +36,43 @@ public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {
     private long[] targetTaskStats;
     private HashFunction h1 = Hashing.murmur3_128(13);
     private HashFunction h2 = Hashing.murmur3_128(17);
+    private Fields fields = null;
+    private Fields outFields = null;
+
+    public PartialKeyGrouping() {
+        //Empty
+    }
+
+    public PartialKeyGrouping(Fields fields) {
+        this.fields = fields;
+    }
 
     @Override
     public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
         this.targetTasks = targetTasks;
         targetTaskStats = new long[this.targetTasks.size()];
+        if (this.fields != null) {
+            this.outFields = context.getComponentOutputFields(stream);
+        }
     }
 
     @Override
     public List<Integer> chooseTasks(int taskId, List<Object> values) {
         List<Integer> boltIds = new ArrayList<Integer>(1);
         if (values.size() > 0) {
-            String str = values.get(0).toString(); // assume key is the first field
-            int firstChoice = (int) Math.abs(h1.hashBytes(str.getBytes()).asLong()) % this.targetTasks.size();
-            int secondChoice = (int) Math.abs(h2.hashBytes(str.getBytes()).asLong()) % this.targetTasks.size();
+            byte[] raw = null;
+            if (fields != null) {
+                List<Object> selectedFields = outFields.select(fields, values);
+                ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4);
+                for (Object o: selectedFields) {
+                    out.putInt(o.hashCode());
+                }
+                raw = out.array();
+            } else {
+                raw = values.get(0).toString().getBytes(); // assume key is the first field
+            }
+            int firstChoice = (int) (Math.abs(h1.hashBytes(raw).asLong()) % this.targetTasks.size());
+            int secondChoice = (int) (Math.abs(h2.hashBytes(raw).asLong()) % this.targetTasks.size());
             int selected = targetTaskStats[firstChoice] > targetTaskStats[secondChoice] ? secondChoice : firstChoice;
             boltIds.add(targetTasks.get(selected));
             targetTaskStats[selected]++;

http://git-wip-us.apache.org/repos/asf/storm/blob/dbefc0da/storm-core/src/jvm/backtype/storm/topology/InputDeclarer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/InputDeclarer.java b/storm-core/src/jvm/backtype/storm/topology/InputDeclarer.java
index 457fa35..ac0848f 100644
--- a/storm-core/src/jvm/backtype/storm/topology/InputDeclarer.java
+++ b/storm-core/src/jvm/backtype/storm/topology/InputDeclarer.java
@@ -45,6 +45,9 @@ public interface InputDeclarer<T extends InputDeclarer> {
     public T directGrouping(String componentId);
     public T directGrouping(String componentId, String streamId);
 
+    public T partialKeyGrouping(String componentId, Fields fields);
+    public T partialKeyGrouping(String componentId, String streamId, Fields fields);
+
     public T customGrouping(String componentId, CustomStreamGrouping grouping);
     public T customGrouping(String componentId, String streamId, CustomStreamGrouping grouping);
     

http://git-wip-us.apache.org/repos/asf/storm/blob/dbefc0da/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java b/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
index aebf995..0a47626 100644
--- a/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
+++ b/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java
@@ -28,6 +28,7 @@ import backtype.storm.generated.SpoutSpec;
 import backtype.storm.generated.StateSpoutSpec;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.grouping.PartialKeyGrouping;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
 import java.util.ArrayList;
@@ -331,6 +332,16 @@ public class TopologyBuilder {
         }
 
         @Override
+        public BoltDeclarer partialKeyGrouping(String componentId, Fields fields) {
+            return customGrouping(componentId, new PartialKeyGrouping(fields));
+        }
+
+        @Override
+        public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) {
+            return customGrouping(componentId, streamId, new PartialKeyGrouping(fields));
+        }
+
+        @Override
         public BoltDeclarer customGrouping(String componentId, CustomStreamGrouping grouping) {
             return customGrouping(componentId, Utils.DEFAULT_STREAM_ID, grouping);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/dbefc0da/storm-core/src/jvm/backtype/storm/transactional/TransactionalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/transactional/TransactionalTopologyBuilder.java b/storm-core/src/jvm/backtype/storm/transactional/TransactionalTopologyBuilder.java
index 570522d..6619e07 100644
--- a/storm-core/src/jvm/backtype/storm/transactional/TransactionalTopologyBuilder.java
+++ b/storm-core/src/jvm/backtype/storm/transactional/TransactionalTopologyBuilder.java
@@ -28,6 +28,7 @@ import backtype.storm.generated.GlobalStreamId;
 import backtype.storm.generated.Grouping;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.grouping.PartialKeyGrouping;
 import backtype.storm.topology.BaseConfigurationDeclarer;
 import backtype.storm.topology.BasicBoltExecutor;
 import backtype.storm.topology.BoltDeclarer;
@@ -448,7 +449,17 @@ public class TransactionalTopologyBuilder {
             });
             return this;
         }
-        
+
+        @Override
+        public BoltDeclarer partialKeyGrouping(String componentId, Fields fields) {
+            return customGrouping(componentId, new PartialKeyGrouping(fields));
+        }
+
+        @Override
+        public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) {
+            return customGrouping(componentId, streamId, new PartialKeyGrouping(fields));
+        }
+
         @Override
         public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) {
             addDeclaration(new InputDeclaration() {

http://git-wip-us.apache.org/repos/asf/storm/blob/dbefc0da/storm-core/src/jvm/storm/trident/topology/TridentTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/topology/TridentTopologyBuilder.java b/storm-core/src/jvm/storm/trident/topology/TridentTopologyBuilder.java
index 7b81ed9..498503b 100644
--- a/storm-core/src/jvm/storm/trident/topology/TridentTopologyBuilder.java
+++ b/storm-core/src/jvm/storm/trident/topology/TridentTopologyBuilder.java
@@ -21,6 +21,7 @@ import backtype.storm.generated.GlobalStreamId;
 import backtype.storm.generated.Grouping;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.grouping.PartialKeyGrouping;
 import backtype.storm.topology.BaseConfigurationDeclarer;
 import backtype.storm.topology.BoltDeclarer;
 import backtype.storm.topology.IRichSpout;
@@ -645,7 +646,17 @@ public class TridentTopologyBuilder {
             });
             return this;
         }
-        
+
+        @Override
+        public BoltDeclarer partialKeyGrouping(String componentId, Fields fields) {
+            return customGrouping(componentId, new PartialKeyGrouping(fields));
+        }
+
+        @Override
+        public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) {
+            return customGrouping(componentId, streamId, new PartialKeyGrouping(fields));
+        }
+
         @Override
         public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) {
             addDeclaration(new InputDeclaration() {

http://git-wip-us.apache.org/repos/asf/storm/blob/dbefc0da/storm-core/test/jvm/backtype/storm/grouping/PartialKeyGroupingTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/grouping/PartialKeyGroupingTest.java b/storm-core/test/jvm/backtype/storm/grouping/PartialKeyGroupingTest.java
index ad43869..4809b45 100644
--- a/storm-core/test/jvm/backtype/storm/grouping/PartialKeyGroupingTest.java
+++ b/storm-core/test/jvm/backtype/storm/grouping/PartialKeyGroupingTest.java
@@ -17,19 +17,23 @@
  */
 package backtype.storm.grouping;
 
-import static org.hamcrest.CoreMatchers.*;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 import java.util.List;
 
 import org.junit.Test;
 
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.task.WorkerTopologyContext;
+import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
 
 import com.google.common.collect.Lists;
 
 public class PartialKeyGroupingTest {
-
     @Test
     public void testChooseTasks() {
         PartialKeyGrouping pkg = new PartialKeyGrouping();
@@ -43,4 +47,20 @@ public class PartialKeyGroupingTest {
         assertThat(choice3, is(not(choice2)));
         assertThat(choice3, is(choice1));
     }
+
+    @Test
+    public void testChooseTasksFields() {
+        PartialKeyGrouping pkg = new PartialKeyGrouping(new Fields("test"));
+        WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+        when(context.getComponentOutputFields(any(GlobalStreamId.class))).thenReturn(new Fields("test"));
+        pkg.prepare(context, null, Lists.newArrayList(0, 1, 2, 3, 4, 5));
+        Values message = new Values("key1");
+        List<Integer> choice1 = pkg.chooseTasks(0, message);
+        assertThat(choice1.size(), is(1));
+        List<Integer> choice2 = pkg.chooseTasks(0, message);
+        assertThat(choice2, is(not(choice1)));
+        List<Integer> choice3 = pkg.chooseTasks(0, message);
+        assertThat(choice3, is(not(choice2)));
+        assertThat(choice3, is(choice1));
+    }
 }


[2/4] storm git commit: Merge branch 'partial-key-grouping' of https://github.com/revans2/incubator-storm into STORM-637

Posted by bo...@apache.org.
Merge branch 'partial-key-grouping' of https://github.com/revans2/incubator-storm into STORM-637

STORM-637: Integrate PartialKeyGrouping into storm API


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0bc25fbf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0bc25fbf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0bc25fbf

Branch: refs/heads/master
Commit: 0bc25fbf990202d6838fb944891d425dd2ba4140
Parents: b920bae dbefc0d
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Feb 9 10:54:49 2015 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Feb 9 10:54:49 2015 -0600

----------------------------------------------------------------------
 docs/documentation/Common-patterns.md           |  14 +-
 docs/documentation/Concepts.md                  |  13 +-
 .../storm/starter/SkewedRollingTopWords.java    | 134 +++++++++++++++++++
 .../storm/starter/bolt/RollingCountAggBolt.java |  78 +++++++++++
 .../coordination/BatchSubtopologyBuilder.java   |  11 ++
 .../storm/drpc/LinearDRPCInputDeclarer.java     |   5 +-
 .../storm/drpc/LinearDRPCTopologyBuilder.java   |  13 +-
 .../storm/grouping/PartialKeyGrouping.java      |  31 ++++-
 .../backtype/storm/topology/InputDeclarer.java  |   3 +
 .../storm/topology/TopologyBuilder.java         |  11 ++
 .../TransactionalTopologyBuilder.java           |  13 +-
 .../topology/TridentTopologyBuilder.java        |  13 +-
 .../storm/grouping/PartialKeyGroupingTest.java  |  26 +++-
 13 files changed, 348 insertions(+), 17 deletions(-)
----------------------------------------------------------------------



[4/4] storm git commit: Merge branch 'STORM-637'

Posted by bo...@apache.org.
Merge branch 'STORM-637'

Conflicts:
	CHANGELOG.md


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8036109f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8036109f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8036109f

Branch: refs/heads/master
Commit: 8036109f68d77802cfd8c4a1e1bb14d4f9e9ff82
Parents: 15f99fd bb6f32d
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Feb 9 12:32:46 2015 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Feb 9 12:32:46 2015 -0600

----------------------------------------------------------------------
 CHANGELOG.md                                    |   1 +
 docs/documentation/Common-patterns.md           |  14 +-
 docs/documentation/Concepts.md                  |  13 +-
 .../storm/starter/SkewedRollingTopWords.java    | 134 +++++++++++++++++++
 .../storm/starter/bolt/RollingCountAggBolt.java |  78 +++++++++++
 .../coordination/BatchSubtopologyBuilder.java   |  11 ++
 .../storm/drpc/LinearDRPCInputDeclarer.java     |   5 +-
 .../storm/drpc/LinearDRPCTopologyBuilder.java   |  13 +-
 .../storm/grouping/PartialKeyGrouping.java      |  31 ++++-
 .../backtype/storm/topology/InputDeclarer.java  |   3 +
 .../storm/topology/TopologyBuilder.java         |  11 ++
 .../TransactionalTopologyBuilder.java           |  13 +-
 .../topology/TridentTopologyBuilder.java        |  13 +-
 .../storm/grouping/PartialKeyGroupingTest.java  |  26 +++-
 14 files changed, 349 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8036109f/CHANGELOG.md
----------------------------------------------------------------------
diff --cc CHANGELOG.md
index 433f33f,98959bb..d0e4a03
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@@ -44,7 -44,7 +44,8 @@@
   * STORM-608: Storm UI CSRF escape characters not work correctly.
   * STORM-626: Add script to print out the merge command for a given pull request.
   * STORM-601: Make jira-github-join ignore case.
 + * STORM-667: Incorrect capitalization "SHell" in Multilang-protocol.md
+  * STORM-637: Integrate PartialKeyGrouping into storm API
  
  ## 0.9.3-rc2
   * STORM-558: change "swap!" to "reset!" to fix assignment-versions in supervisor


[3/4] storm git commit: Added STORM-637 to Changelog

Posted by bo...@apache.org.
Added STORM-637 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bb6f32df
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bb6f32df
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bb6f32df

Branch: refs/heads/master
Commit: bb6f32dffaa3865b8b374c8fb1d7d7af48ebe4d4
Parents: 0bc25fb
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Feb 9 10:55:26 2015 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Feb 9 10:55:26 2015 -0600

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bb6f32df/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 997927e..98959bb 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -44,6 +44,7 @@
  * STORM-608: Storm UI CSRF escape characters not work correctly.
  * STORM-626: Add script to print out the merge command for a given pull request.
  * STORM-601: Make jira-github-join ignore case.
+ * STORM-637: Integrate PartialKeyGrouping into storm API
 
 ## 0.9.3-rc2
  * STORM-558: change "swap!" to "reset!" to fix assignment-versions in supervisor