You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/02/17 03:40:04 UTC

[1/4] storm git commit: min/max operators implementation in Trident streams API.

Repository: storm
Updated Branches:
  refs/heads/master 82bea75c9 -> 46999909a


min/max operators implementation in Trident streams API.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5295a909
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5295a909
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5295a909

Branch: refs/heads/master
Commit: 5295a909b1fc8c136a220a0e636c0d70c036e5c5
Parents: 31b57e8
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Fri Jan 29 12:39:26 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Mon Feb 8 11:14:41 2016 +0530

----------------------------------------------------------------------
 .../TridentMinMaxOperationsTopology.java        | 208 +++++++++++++++++++
 .../jvm/org/apache/storm/trident/Stream.java    | 117 +++++++++--
 .../operation/builtin/ComparisonAggregator.java |  72 +++++++
 .../storm/trident/operation/builtin/Max.java    |  43 ++++
 .../operation/builtin/MaxWithComparator.java    |  44 ++++
 .../storm/trident/operation/builtin/Min.java    |  44 ++++
 .../operation/builtin/MinWithComparator.java    |  44 ++++
 .../trident/testing/NumberGeneratorSpout.java   |  92 ++++++++
 8 files changed, 651 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java
new file mode 100644
index 0000000..dedaaff
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.testing.NumberGeneratorSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * This class contains different usages of minBy, maxBy, min and max operations on trident streams.
+ *
+ */
+public class TridentMinMaxOperationsTopology {
+    public static class Split extends BaseFunction {
+        @Override
+        public void execute(TridentTuple tuple, TridentCollector collector) {
+            String sentence = tuple.getString(0);
+            for (String word : sentence.split(" ")) {
+                collector.emit(new Values(word));
+            }
+        }
+    }
+
+    public static StormTopology buildIdsTopology() {
+        NumberGeneratorSpout spout = new NumberGeneratorSpout(new Fields("id"), 10, 1000);
+
+        TridentTopology topology = new TridentTopology();
+        Stream wordsStream = topology.newStream("numgen-spout", spout).
+                each(new Fields("id"), new Debug("##### ids"));
+
+        wordsStream.minBy("id").
+                each(new Fields("id"), new Debug("#### min-id"));
+
+        wordsStream.maxBy("id").
+                each(new Fields("id"), new Debug("#### max-id"));
+
+        return topology.build();
+    }
+
+    public static StormTopology buildWordsTopology() {
+        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
+                new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
+                new Values("how many apples can you eat"), new Values("to be or not to be the person"));
+        spout.setCycle(true);
+
+        TridentTopology topology = new TridentTopology();
+        Stream wordsStream = topology.newStream("spout1", spout).parallelismHint(16).
+                each(new Fields("sentence"), new Split(), new Fields("word")).
+                each(new Fields("word"), new Debug("##### words"));
+
+        wordsStream.minBy("word").
+                each(new Fields("word"), new Debug("#### lowest word"));
+
+        wordsStream.maxBy("word").
+                each(new Fields("word"), new Debug("#### highest word"));
+
+        return topology.build();
+    }
+
+    public static StormTopology buildVehiclesTopology() {
+
+        FixedBatchSpout spout = new FixedBatchSpout(new Fields("vehicle", "driver"), 10, Vehicle.generateVehicles(20));
+        spout.setCycle(true);
+
+        TridentTopology topology = new TridentTopology();
+        Stream vehiclesStream = topology.newStream("spout1", spout).
+                each(new Fields("vehicle"), new Debug("##### vehicles"));
+
+        vehiclesStream.min(new SpeedComparator())
+                .each(new Fields("vehicle"), new Debug("#### slowest vehicle"))
+                .project(new Fields("driver")). each(new Fields("driver"), new Debug("##### slowest driver"));
+
+        vehiclesStream.max(new SpeedComparator())
+                .each(new Fields("vehicle"), new Debug("#### fastest vehicle"))
+                .project(new Fields("driver")). each(new Fields("driver"), new Debug("##### fastest driver"));
+
+        vehiclesStream.max(new EfficiencyComparator()).
+                each(new Fields("vehicle"), new Debug("#### efficient vehicle"));
+
+        return topology.build();
+    }
+
+    public static void main(String[] args) throws Exception {
+        Config conf = new Config();
+        conf.setMaxSpoutPending(20);
+        StormTopology[] topologies = {buildWordsTopology(), buildIdsTopology(), buildVehiclesTopology()};
+        if (args.length == 0) {
+            for (StormTopology topology : topologies) {
+                LocalCluster cluster = new LocalCluster();
+                cluster.submitTopology("min-max-topology", conf, topology);
+                Utils.sleep(60*1000);
+                cluster.shutdown();
+            }
+            System.exit(0);
+        } else {
+            conf.setNumWorkers(3);
+            int ct=1;
+            for (StormTopology topology : topologies) {
+                StormSubmitter.submitTopologyWithProgressBar(args[0]+"-"+ct++, conf, topology);
+            }
+        }
+    }
+
+    static class SpeedComparator implements Comparator<TridentTuple>, Serializable {
+
+        @Override
+        public int compare(TridentTuple tuple1, TridentTuple tuple2) {
+            Vehicle vehicle1 = (Vehicle) tuple1.getValueByField("vehicle");
+            Vehicle vehicle2 = (Vehicle) tuple2.getValueByField("vehicle");
+            return Integer.compare(vehicle1.maxSpeed, vehicle2.maxSpeed);
+        }
+    }
+
+    static class EfficiencyComparator implements Comparator<TridentTuple>, Serializable {
+
+        @Override
+        public int compare(TridentTuple tuple1, TridentTuple tuple2) {
+            Vehicle vehicle1 = (Vehicle) tuple1.getValueByField("vehicle");
+            Vehicle vehicle2 = (Vehicle) tuple2.getValueByField("vehicle");
+            return Double.compare(vehicle1.efficiency, vehicle2.efficiency);
+        }
+
+    }
+
+    static class Driver implements Serializable {
+        final String name;
+        final int id;
+
+        Driver(String name, int id) {
+            this.name = name;
+            this.id = id;
+        }
+
+        @Override
+        public String toString() {
+            return "Driver{" +
+                    "name='" + name + '\'' +
+                    ", id=" + id +
+                    '}';
+        }
+    }
+
+    static class Vehicle implements Serializable {
+        final String name;
+        final int maxSpeed;
+        final double efficiency;
+
+        public Vehicle(String name, int maxSpeed, double efficiency) {
+            this.name = name;
+            this.maxSpeed = maxSpeed;
+            this.efficiency = efficiency;
+        }
+
+        @Override
+        public String toString() {
+            return "Vehicle{" +
+                    "name='" + name + '\'' +
+                    ", maxSpeed=" + maxSpeed +
+                    ", efficiency=" + efficiency +
+                    '}';
+        }
+
+        public static List<Object>[] generateVehicles(int count) {
+            List<Object>[] vehicles = new List[count];
+            for(int i=0; i<count; i++) {
+                int id = i-1;
+                vehicles[i] =
+                        (new Values(
+                                new Vehicle("Vehicle-"+id, ThreadLocalRandom.current().nextInt(0, 100), ThreadLocalRandom.current().nextDouble(1, 5)),
+                                new Driver("Driver-"+id, id)
+                        ));
+            }
+            return vehicles;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index 7c6d93f..fa62b72 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -19,31 +19,33 @@ package org.apache.storm.trident;
 
 import org.apache.storm.generated.Grouping;
 import org.apache.storm.generated.NullStruct;
-import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer;
 import org.apache.storm.grouping.CustomStreamGrouping;
-import org.apache.storm.trident.operation.Consumer;
-import org.apache.storm.trident.operation.FlatMapFunction;
-import org.apache.storm.trident.operation.MapFunction;
-import org.apache.storm.trident.operation.impl.ConsumerExecutor;
-import org.apache.storm.trident.operation.impl.FlatMapFunctionExecutor;
-import org.apache.storm.trident.operation.impl.MapFunctionExecutor;
-import org.apache.storm.trident.planner.processor.MapProcessor;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
+import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer;
 import org.apache.storm.trident.fluent.GlobalAggregationScheme;
 import org.apache.storm.trident.fluent.GroupedStream;
 import org.apache.storm.trident.fluent.IAggregatableStream;
 import org.apache.storm.trident.operation.Aggregator;
 import org.apache.storm.trident.operation.Assembly;
 import org.apache.storm.trident.operation.CombinerAggregator;
+import org.apache.storm.trident.operation.Consumer;
 import org.apache.storm.trident.operation.Filter;
+import org.apache.storm.trident.operation.FlatMapFunction;
 import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.MapFunction;
 import org.apache.storm.trident.operation.ReducerAggregator;
+import org.apache.storm.trident.operation.builtin.ComparisonAggregator;
+import org.apache.storm.trident.operation.builtin.Max;
+import org.apache.storm.trident.operation.builtin.MaxWithComparator;
+import org.apache.storm.trident.operation.builtin.Min;
+import org.apache.storm.trident.operation.builtin.MinWithComparator;
 import org.apache.storm.trident.operation.impl.CombinerAggStateUpdater;
+import org.apache.storm.trident.operation.impl.ConsumerExecutor;
 import org.apache.storm.trident.operation.impl.FilterExecutor;
+import org.apache.storm.trident.operation.impl.FlatMapFunctionExecutor;
 import org.apache.storm.trident.operation.impl.GlobalBatchToPartition;
-import org.apache.storm.trident.operation.impl.ReducerAggStateUpdater;
 import org.apache.storm.trident.operation.impl.IndexHashBatchToPartition;
+import org.apache.storm.trident.operation.impl.MapFunctionExecutor;
+import org.apache.storm.trident.operation.impl.ReducerAggStateUpdater;
 import org.apache.storm.trident.operation.impl.SingleEmitAggregator.BatchToPartition;
 import org.apache.storm.trident.operation.impl.TrueFilter;
 import org.apache.storm.trident.partition.GlobalGrouping;
@@ -55,6 +57,7 @@ import org.apache.storm.trident.planner.PartitionNode;
 import org.apache.storm.trident.planner.ProcessorNode;
 import org.apache.storm.trident.planner.processor.AggregateProcessor;
 import org.apache.storm.trident.planner.processor.EachProcessor;
+import org.apache.storm.trident.planner.processor.MapProcessor;
 import org.apache.storm.trident.planner.processor.PartitionPersistProcessor;
 import org.apache.storm.trident.planner.processor.ProjectedProcessor;
 import org.apache.storm.trident.planner.processor.StateQueryProcessor;
@@ -62,7 +65,12 @@ import org.apache.storm.trident.state.QueryFunction;
 import org.apache.storm.trident.state.StateFactory;
 import org.apache.storm.trident.state.StateSpec;
 import org.apache.storm.trident.state.StateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.trident.util.TridentUtils;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.util.Comparator;
 
 /**
  * A Stream represents the core data model in Trident, and can be thought of as a "stream" of tuples that are processed
@@ -436,8 +444,91 @@ public class Stream implements IAggregatableStream {
         return chainedAgg()
                .partitionAggregate(inputFields, agg, functionFields)
                .chainEnd();
-    }  
-    
+    }
+
+    /**
+     * This aggregator operation computes the minimum of tuples by the given {@code inputFieldName} and it is
+     * assumed that its value is an instance of {@code Comparable}.
+     *
+     * @param inputFieldName input field name
+     * @return
+     */
+    public Stream minBy(String inputFieldName) {
+        Aggregator<ComparisonAggregator.State> min = new Min(inputFieldName);
+        return comparableAggregateStream(inputFieldName, min);
+    }
+
+    /**
+     * This aggregator operation computes the minimum of tuples by the given {@code inputFieldName} in a stream by
+     * using the given {@code comparator}.
+     *
+     * @param inputFieldName input field name
+     * @param comparator comparator used in for finding minimum of two tuple values of {@code inputFieldName}.
+     * @param <T> type of tuple's given input field value.
+     * @return
+     */
+    public <T> Stream minBy(String inputFieldName, Comparator<T> comparator) {
+        Aggregator<ComparisonAggregator.State> min = new MinWithComparator<>(inputFieldName, comparator);
+        return comparableAggregateStream(inputFieldName, min);
+    }
+
+    /**
+     * This aggregator operation computes the minimum of tuples in a stream by using the given {@code comparator} with
+     * {@code TridentTuple}s.
+     *
+     * @param comparator comparator used in for finding minimum of two tuple values.
+     * @return
+     */
+    public Stream min(Comparator<TridentTuple> comparator) {
+        Aggregator<ComparisonAggregator.State> min = new MinWithComparator<>(comparator);
+        return comparableAggregateStream(null, min);
+    }
+
+    /**
+     * This aggregator operation computes the maximum of tuples by the given {@code inputFieldName} and it is
+     * assumed that its value is an instance of {@code Comparable}.
+     *
+     * @param inputFieldName input field name
+     * @return
+     */
+    public Stream maxBy(String inputFieldName) {
+        Aggregator<ComparisonAggregator.State> max = new Max(inputFieldName);
+        return comparableAggregateStream(inputFieldName, max);
+    }
+
+    /**
+     * This aggregator operation computes the maximum of tuples by the given {@code inputFieldName} in a stream by
+     * using the given {@code comparator}.
+     *
+     * @param inputFieldName input field name
+     * @param comparator comparator used in for finding maximum of two tuple values of {@code inputFieldName}.
+     * @param <T> type of tuple's given input field value.
+     * @return
+     */
+    public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator) {
+        Aggregator<ComparisonAggregator.State> max = new MaxWithComparator<>(inputFieldName, comparator);
+        return comparableAggregateStream(inputFieldName, max);
+    }
+
+    /**
+     * This aggregator operation computes the maximum of tuples in a stream by using the given {@code comparator} with
+     * {@code TridentTuple}s.
+     *
+     * @param comparator comparator used in for finding maximum of two tuple values.
+     * @return
+     */
+    public Stream max(Comparator<TridentTuple> comparator) {
+        Aggregator<ComparisonAggregator.State> max = new MaxWithComparator<>(comparator);
+        return comparableAggregateStream(null, max);
+    }
+
+    private <T> Stream comparableAggregateStream(String inputFieldName, Aggregator<T> aggregator) {
+        if(inputFieldName != null) {
+            projectionValidation(new Fields(inputFieldName));
+        }
+        return partitionAggregate(getOutputFields(), aggregator, getOutputFields());
+    }
+
     public Stream aggregate(Aggregator agg, Fields functionFields) {
         return aggregate(null, agg, functionFields);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java
new file mode 100644
index 0000000..0109bb5
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.operation.builtin;
+
+import org.apache.storm.trident.operation.BaseAggregator;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+/**
+ * Abstract {@code Aggregator} for comparing two values in a stream.
+ *
+ */
+public abstract class ComparisonAggregator<T> extends BaseAggregator<ComparisonAggregator.State> {
+
+    public static class State {
+        TridentTuple previousTuple;
+    }
+
+    private final String inputFieldName;
+
+    public ComparisonAggregator(String inputFieldName) {
+        this.inputFieldName = inputFieldName;
+    }
+
+    protected abstract T compare(T value1, T value2);
+
+    @Override
+    public State init(Object batchId, TridentCollector collector) {
+        return new State();
+    }
+
+    @Override
+    public void aggregate(State state, TridentTuple tuple, TridentCollector collector) {
+        T value1 = valueFromTuple(state.previousTuple);
+        T value2 = valueFromTuple(tuple);
+
+        if(value2 == null) {
+            return;
+        }
+
+        if(value1 == null || compare(value1, value2) == value2) {
+            state.previousTuple = tuple;
+        }
+
+    }
+
+    protected T valueFromTuple(TridentTuple tuple) {
+        // when there is no input field then the whole tuple is considered for comparison.
+        return (T) (inputFieldName != null && tuple != null ? tuple.getValueByField(inputFieldName) : tuple);
+    }
+
+    @Override
+    public void complete(State state, TridentCollector collector) {
+        collector.emit(state.previousTuple.getValues());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java
new file mode 100644
index 0000000..5385dfb
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.operation.builtin;
+
+/**
+ * This aggregator computes the maximum of aggregated tuples in a stream. It assumes that the tuple has one value and
+ * it is an instance of {@code Comparable}.
+ *
+ */
+public class Max extends ComparisonAggregator<Comparable<Object>> {
+
+    public Max(String inputFieldName) {
+        super(inputFieldName);
+    }
+
+    @Override
+    protected Comparable<Object> compare(Comparable<Object> value1, Comparable<Object> value2) {
+        return value1.compareTo(value2) > 0 ? value1 : value2;
+    }
+
+    /**
+     * Returns an aggregator computes the maximum of aggregated tuples in a stream. It assumes that the tuple has one value and
+     * it is an instance of {@code Comparable}.
+     *
+     * @return
+     */
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java
new file mode 100644
index 0000000..172aa58
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.operation.builtin;
+
+import java.util.Comparator;
+
+/**
+ * This aggregator computes the maximum of aggregated tuples in a stream. It uses given {@code comparator} for comparing
+ * two values in a stream.
+ *
+ */
+public class MaxWithComparator<T> extends ComparisonAggregator<T> {
+    private final Comparator<T> comparator;
+    
+    public MaxWithComparator(Comparator<T> comparator) {
+        this(null, comparator);
+    }
+
+    public MaxWithComparator(String inputFieldName, Comparator<T> comparator) {
+        super(inputFieldName);
+        this.comparator = comparator;
+    }
+
+    @Override
+    protected T compare(T value1, T value2) {
+        return comparator.compare(value1, value2) > 0 ? value1 : value2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java
new file mode 100644
index 0000000..0757d7c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.operation.builtin;
+
+/**
+ * This aggregator computes the minimum of aggregated tuples in a stream. It assumes that the tuple has one value and
+ * it is an instance of {@code Comparable}.
+ *
+ */
+public class Min extends ComparisonAggregator<Comparable<Object>> {
+
+    public Min(String inputFieldName) {
+        super(inputFieldName);
+    }
+
+    @Override
+    protected Comparable<Object> compare(Comparable<Object> value1, Comparable<Object> value2) {
+        return value1.compareTo(value2) < 0 ? value1 : value2;
+    }
+
+    /**
+     * Returns an aggregator computes the maximum of aggregated tuples in a stream. It assumes that the tuple has one value and
+     * it is an instance of {@code Comparable}.
+     *
+     * @return
+     * @param inputFieldName
+     */
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java
new file mode 100644
index 0000000..d33e000
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.operation.builtin;
+
+import java.util.Comparator;
+
+/**
+ * This aggregator computes the minimum of aggregated tuples in a stream. It uses given @{code comparator} for comparing
+ * two values in a stream.
+ *
+ */
+public class MinWithComparator<T> extends ComparisonAggregator<T> {
+    private final Comparator<T> comparator;
+
+    public MinWithComparator(String inputFieldName, Comparator<T> comparator) {
+        super(inputFieldName);
+        this.comparator = comparator;
+    }
+
+    public MinWithComparator(Comparator<T> comparator) {
+        this(null, comparator);
+    }
+
+    @Override
+    protected T compare(T value1, T value2) {
+        return comparator.compare(value1, value2) < 0 ? value1 : value2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java b/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java
new file mode 100644
index 0000000..a4a9a79
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.testing;
+
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IBatchSpout;
+import org.apache.storm.tuple.Fields;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ *
+ */
+public class NumberGeneratorSpout implements IBatchSpout {
+    private final Fields fields;
+    private final int batchSize;
+    private final int maxNumber;
+    private final Map<Long, List<List<Object>>> batches = new HashMap<>();
+
+    public NumberGeneratorSpout(Fields fields, int batchSize, int maxNumber) {
+        this.fields = fields;
+        this.batchSize = batchSize;
+        this.maxNumber = maxNumber;
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context) {
+    }
+
+    @Override
+    public void emitBatch(long batchId, TridentCollector collector) {
+        List<List<Object>> values = null;
+        if(batches.containsKey(batchId)) {
+            values = batches.get(batchId);
+        } else {
+            values = new ArrayList<>();
+            for (int i = 0; i < batchSize; i++) {
+                values.add(Collections.singletonList((Object) ThreadLocalRandom.current().nextInt(0, maxNumber + 1)));
+            }
+            batches.put(batchId, values);
+        }
+        for (List<Object> value : values) {
+            collector.emit(value);
+        }
+    }
+
+    @Override
+    public void ack(long batchId) {
+        batches.remove(batchId);
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        Config conf = new Config();
+        conf.setMaxTaskParallelism(1);
+        return conf;
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        return fields;
+    }
+}


[4/4] storm git commit: Added STORM-1511 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-1511 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/46999909
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/46999909
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/46999909

Branch: refs/heads/master
Commit: 46999909a7c2f1a8b3e436f314971e9a21467547
Parents: 68c87b5
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Tue Feb 16 18:39:13 2016 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Feb 16 18:39:13 2016 -0800

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/46999909/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 36aa9d5..f50d05f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-1511: min/max operators implementation in Trident streams API.
  * STROM-1263: port backtype.storm.command.kill-topology to java
  * STORM-1260: port backtype.storm.command.activate to java
  * STORM-1261: port backtype.storm.command.deactivate to java


[3/4] storm git commit: Merge branch 'min-max' of http://github.com/satishd/storm into STORM-1511

Posted by sr...@apache.org.
Merge branch 'min-max' of http://github.com/satishd/storm into STORM-1511


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/68c87b5e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/68c87b5e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/68c87b5e

Branch: refs/heads/master
Commit: 68c87b5eb2d254586fabfc81f5ed7addbb9fdc68
Parents: 82bea75 bc263cb
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Tue Feb 16 18:35:03 2016 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Feb 16 18:35:03 2016 -0800

----------------------------------------------------------------------
 .../spout/RandomNumberGeneratorSpout.java       |  95 +++++++++
 .../trident/TridentMinMaxOfDevicesTopology.java | 201 +++++++++++++++++++
 .../TridentMinMaxOfVehiclesTopology.java        | 180 +++++++++++++++++
 .../jvm/org/apache/storm/trident/Stream.java    | 121 +++++++++--
 .../operation/builtin/ComparisonAggregator.java |  91 +++++++++
 .../storm/trident/operation/builtin/Max.java    |  37 ++++
 .../operation/builtin/MaxWithComparator.java    |  51 +++++
 .../storm/trident/operation/builtin/Min.java    |  36 ++++
 .../operation/builtin/MinWithComparator.java    |  51 +++++
 9 files changed, 850 insertions(+), 13 deletions(-)
----------------------------------------------------------------------



[2/4] storm git commit: Addressed review comments

Posted by sr...@apache.org.
Addressed review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bc263cba
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bc263cba
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bc263cba

Branch: refs/heads/master
Commit: bc263cba67283b0c1ebe95be49e137fad86b3978
Parents: 5295a90
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Mon Feb 8 16:49:46 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Tue Feb 9 10:31:18 2016 +0530

----------------------------------------------------------------------
 .../spout/RandomNumberGeneratorSpout.java       |  95 +++++++++
 .../trident/TridentMinMaxOfDevicesTopology.java | 201 ++++++++++++++++++
 .../TridentMinMaxOfVehiclesTopology.java        | 180 ++++++++++++++++
 .../TridentMinMaxOperationsTopology.java        | 208 -------------------
 .../jvm/org/apache/storm/trident/Stream.java    |  24 ++-
 .../operation/builtin/ComparisonAggregator.java |  23 +-
 .../storm/trident/operation/builtin/Max.java    |   6 -
 .../operation/builtin/MaxWithComparator.java    |   7 +
 .../storm/trident/operation/builtin/Min.java    |   8 -
 .../operation/builtin/MinWithComparator.java    |   7 +
 .../trident/testing/NumberGeneratorSpout.java   |  92 --------
 11 files changed, 525 insertions(+), 326 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomNumberGeneratorSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomNumberGeneratorSpout.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomNumberGeneratorSpout.java
new file mode 100644
index 0000000..1d1b082
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomNumberGeneratorSpout.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.starter.spout;
+
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IBatchSpout;
+import org.apache.storm.tuple.Fields;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * This spout generates random whole numbers with given {@code maxNumber} value as maximum with the given {@code fields}.
+ *
+ */
+public class RandomNumberGeneratorSpout implements IBatchSpout {
+    private final Fields fields;
+    private final int batchSize;
+    private final int maxNumber;
+    private final Map<Long, List<List<Object>>> batches = new HashMap<>();
+
+    public RandomNumberGeneratorSpout(Fields fields, int batchSize, int maxNumber) {
+        this.fields = fields;
+        this.batchSize = batchSize;
+        this.maxNumber = maxNumber;
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context) {
+    }
+
+    @Override
+    public void emitBatch(long batchId, TridentCollector collector) {
+        List<List<Object>> values = null;
+        if(batches.containsKey(batchId)) {
+            values = batches.get(batchId);
+        } else {
+            values = new ArrayList<>();
+            for (int i = 0; i < batchSize; i++) {
+                List<Object> numbers = new ArrayList<>();
+                for (int x=0; x<fields.size(); x++) {
+                    numbers.add(ThreadLocalRandom.current().nextInt(0, maxNumber + 1));
+                }
+                values.add(numbers);
+            }
+            batches.put(batchId, values);
+        }
+        for (List<Object> value : values) {
+            collector.emit(value);
+        }
+    }
+
+    @Override
+    public void ack(long batchId) {
+        batches.remove(batchId);
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        Config conf = new Config();
+        conf.setMaxTaskParallelism(1);
+        return conf;
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        return fields;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
new file mode 100644
index 0000000..d985436
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.starter.spout.RandomNumberGeneratorSpout;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * This class demonstrates different usages of
+ * * {@link Stream#minBy(String)}
+ * * {@link Stream#maxBy(String)}
+ * operations on trident {@link Stream}.
+ */
+public class TridentMinMaxOfDevicesTopology {
+
+    /**
+     * Creates a topology with device-id and count (which are whole numbers) as tuple fields in a stream and it finally
+     * generates result stream based on min amd max with device-id and count values.
+     */
+    public static StormTopology buildDevicesTopology() {
+        String deviceID = "device-id";
+        String count = "count";
+        Fields allFields = new Fields(deviceID, count);
+
+        RandomNumberGeneratorSpout spout = new RandomNumberGeneratorSpout(allFields, 10, 1000);
+
+        TridentTopology topology = new TridentTopology();
+        Stream devicesStream = topology.newStream("devicegen-spout", spout).
+                each(allFields, new Debug("##### devices"));
+
+        devicesStream.minBy(deviceID).
+                each(allFields, new Debug("#### device with min id"));
+
+        devicesStream.maxBy(count).
+                each(allFields, new Debug("#### device with max count"));
+
+        return topology.build();
+    }
+
+    /**
+     * Creates a topology which demonstrates min/max operations on tuples of stream which contain vehicle and driver fields
+     * with values {@link TridentMinMaxOfDevicesTopology.Vehicle} and {@link TridentMinMaxOfDevicesTopology.Driver} respectively.
+     */
+    public static StormTopology buildVehiclesTopology() {
+        Fields driverField = new Fields(Driver.FIELD_NAME);
+        Fields vehicleField = new Fields(Vehicle.FIELD_NAME);
+        Fields allFields = new Fields(Vehicle.FIELD_NAME, Driver.FIELD_NAME);
+
+        FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, Vehicle.generateVehicles(20));
+        spout.setCycle(true);
+
+        TridentTopology topology = new TridentTopology();
+        Stream vehiclesStream = topology.newStream("spout1", spout).
+                each(allFields, new Debug("##### vehicles"));
+
+        Stream slowVehiclesStream =
+                vehiclesStream
+                        .min(new SpeedComparator())
+                        .each(vehicleField, new Debug("#### slowest vehicle"));
+
+        Stream slowDriversStream =
+                slowVehiclesStream
+                        .project(driverField)
+                        .each(driverField, new Debug("##### slowest driver"));
+
+        vehiclesStream
+                .max(new SpeedComparator())
+                .each(vehicleField, new Debug("#### fastest vehicle"))
+                .project(driverField)
+                .each(driverField, new Debug("##### fastest driver"));
+
+        vehiclesStream
+                .max(new EfficiencyComparator()).
+                each(vehicleField, new Debug("#### efficient vehicle"));
+
+        return topology.build();
+    }
+
+    public static void main(String[] args) throws Exception {
+
+        StormTopology topology = buildDevicesTopology();
+        Config conf = new Config();
+        conf.setMaxSpoutPending(20);
+        if (args.length == 0) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("devices-topology", conf, topology);
+            Utils.sleep(60 * 1000);
+            cluster.shutdown();
+            System.exit(0);
+        } else {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopologyWithProgressBar("devices-topology", conf, topology);
+        }
+    }
+
+    static class SpeedComparator implements Comparator<TridentTuple>, Serializable {
+
+        @Override
+        public int compare(TridentTuple tuple1, TridentTuple tuple2) {
+            Vehicle vehicle1 = (Vehicle) tuple1.getValueByField(Vehicle.FIELD_NAME);
+            Vehicle vehicle2 = (Vehicle) tuple2.getValueByField(Vehicle.FIELD_NAME);
+            return Integer.compare(vehicle1.maxSpeed, vehicle2.maxSpeed);
+        }
+    }
+
+    static class EfficiencyComparator implements Comparator<TridentTuple>, Serializable {
+
+        @Override
+        public int compare(TridentTuple tuple1, TridentTuple tuple2) {
+            Vehicle vehicle1 = (Vehicle) tuple1.getValueByField(Vehicle.FIELD_NAME);
+            Vehicle vehicle2 = (Vehicle) tuple2.getValueByField(Vehicle.FIELD_NAME);
+            return Double.compare(vehicle1.efficiency, vehicle2.efficiency);
+        }
+
+    }
+
+    static class Driver implements Serializable {
+        static final String FIELD_NAME = "driver";
+        final String name;
+        final int id;
+
+        Driver(String name, int id) {
+            this.name = name;
+            this.id = id;
+        }
+
+        @Override
+        public String toString() {
+            return "Driver{" +
+                    "name='" + name + '\'' +
+                    ", id=" + id +
+                    '}';
+        }
+    }
+
+    static class Vehicle implements Serializable {
+        static final String FIELD_NAME = "vehicle";
+        final String name;
+        final int maxSpeed;
+        final double efficiency;
+
+        public Vehicle(String name, int maxSpeed, double efficiency) {
+            this.name = name;
+            this.maxSpeed = maxSpeed;
+            this.efficiency = efficiency;
+        }
+
+        @Override
+        public String toString() {
+            return "Vehicle{" +
+                    "name='" + name + '\'' +
+                    ", maxSpeed=" + maxSpeed +
+                    ", efficiency=" + efficiency +
+                    '}';
+        }
+
+        public static List<Object>[] generateVehicles(int count) {
+            List<Object>[] vehicles = new List[count];
+            for (int i = 0; i < count; i++) {
+                int id = i - 1;
+                vehicles[i] =
+                        (new Values(
+                                new Vehicle("Vehicle-" + id, ThreadLocalRandom.current().nextInt(0, 100), ThreadLocalRandom.current().nextDouble(1, 5)),
+                                new Driver("Driver-" + id, id)
+                        ));
+            }
+            return vehicles;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
new file mode 100644
index 0000000..192b198
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * This class demonstrates different usages of
+ * * {@link Stream#minBy(String, Comparator)}
+ * * {@link Stream#min(Comparator)}
+ * * {@link Stream#maxBy(String, Comparator)}
+ * * {@link Stream#max(Comparator)}
+ * operations on trident {@link Stream}.
+ */
+public class TridentMinMaxOfVehiclesTopology {
+
+    /**
+     * Creates a topology which demonstrates min/max operations on tuples of stream which contain vehicle and driver fields
+     * with values {@link TridentMinMaxOfVehiclesTopology.Vehicle} and {@link TridentMinMaxOfVehiclesTopology.Driver} respectively.
+     */
+    public static StormTopology buildVehiclesTopology() {
+        Fields driverField = new Fields(Driver.FIELD_NAME);
+        Fields vehicleField = new Fields(Vehicle.FIELD_NAME);
+        Fields allFields = new Fields(Vehicle.FIELD_NAME, Driver.FIELD_NAME);
+
+        FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, Vehicle.generateVehicles(20));
+        spout.setCycle(true);
+
+        TridentTopology topology = new TridentTopology();
+        Stream vehiclesStream = topology.newStream("spout1", spout).
+                each(allFields, new Debug("##### vehicles"));
+
+        Stream slowVehiclesStream =
+                vehiclesStream
+                        .min(new SpeedComparator())
+                        .each(vehicleField, new Debug("#### slowest vehicle"));
+
+        Stream slowDriversStream =
+                slowVehiclesStream
+                        .project(driverField)
+                        .each(driverField, new Debug("##### slowest driver"));
+
+        vehiclesStream
+                .max(new SpeedComparator())
+                .each(vehicleField, new Debug("#### fastest vehicle"))
+                .project(driverField)
+                .each(driverField, new Debug("##### fastest driver"));
+
+        vehiclesStream
+                .minBy(Vehicle.FIELD_NAME, new EfficiencyComparator()).
+                each(vehicleField, new Debug("#### least efficient vehicle"));
+
+        vehiclesStream
+                .maxBy(Vehicle.FIELD_NAME, new EfficiencyComparator()).
+                each(vehicleField, new Debug("#### most efficient vehicle"));
+
+        return topology.build();
+    }
+
+    public static void main(String[] args) throws Exception {
+
+        StormTopology topology = buildVehiclesTopology();
+        Config conf = new Config();
+        conf.setMaxSpoutPending(20);
+        if (args.length == 0) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("vehicles-topology", conf, topology);
+            Utils.sleep(60 * 1000);
+            cluster.shutdown();
+            System.exit(0);
+        } else {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopologyWithProgressBar("vehicles-topology", conf, topology);
+        }
+    }
+
+    static class SpeedComparator implements Comparator<TridentTuple>, Serializable {
+
+        @Override
+        public int compare(TridentTuple tuple1, TridentTuple tuple2) {
+            Vehicle vehicle1 = (Vehicle) tuple1.getValueByField(Vehicle.FIELD_NAME);
+            Vehicle vehicle2 = (Vehicle) tuple2.getValueByField(Vehicle.FIELD_NAME);
+            return Integer.compare(vehicle1.maxSpeed, vehicle2.maxSpeed);
+        }
+    }
+
+    static class EfficiencyComparator implements Comparator<Vehicle>, Serializable {
+
+        @Override
+        public int compare(Vehicle vehicle1, Vehicle vehicle2) {
+            return Double.compare(vehicle1.efficiency, vehicle2.efficiency);
+        }
+
+    }
+
+    static class Driver implements Serializable {
+        static final String FIELD_NAME = "driver";
+        final String name;
+        final int id;
+
+        Driver(String name, int id) {
+            this.name = name;
+            this.id = id;
+        }
+
+        @Override
+        public String toString() {
+            return "Driver{" +
+                    "name='" + name + '\'' +
+                    ", id=" + id +
+                    '}';
+        }
+    }
+
+    static class Vehicle implements Serializable {
+        static final String FIELD_NAME = "vehicle";
+        final String name;
+        final int maxSpeed;
+        final double efficiency;
+
+        public Vehicle(String name, int maxSpeed, double efficiency) {
+            this.name = name;
+            this.maxSpeed = maxSpeed;
+            this.efficiency = efficiency;
+        }
+
+        @Override
+        public String toString() {
+            return "Vehicle{" +
+                    "name='" + name + '\'' +
+                    ", maxSpeed=" + maxSpeed +
+                    ", efficiency=" + efficiency +
+                    '}';
+        }
+
+        public static List<Object>[] generateVehicles(int count) {
+            List<Object>[] vehicles = new List[count];
+            for (int i = 0; i < count; i++) {
+                int id = i - 1;
+                vehicles[i] =
+                        (new Values(
+                                new Vehicle("Vehicle-" + id, ThreadLocalRandom.current().nextInt(0, 100), ThreadLocalRandom.current().nextDouble(1, 5)),
+                                new Driver("Driver-" + id, id)
+                        ));
+            }
+            return vehicles;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java
deleted file mode 100644
index dedaaff..0000000
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.starter.trident;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.BaseFunction;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.operation.builtin.Debug;
-import org.apache.storm.trident.testing.FixedBatchSpout;
-import org.apache.storm.trident.testing.NumberGeneratorSpout;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-
-import java.io.Serializable;
-import java.util.Comparator;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-
-/**
- * This class contains different usages of minBy, maxBy, min and max operations on trident streams.
- *
- */
-public class TridentMinMaxOperationsTopology {
-    public static class Split extends BaseFunction {
-        @Override
-        public void execute(TridentTuple tuple, TridentCollector collector) {
-            String sentence = tuple.getString(0);
-            for (String word : sentence.split(" ")) {
-                collector.emit(new Values(word));
-            }
-        }
-    }
-
-    public static StormTopology buildIdsTopology() {
-        NumberGeneratorSpout spout = new NumberGeneratorSpout(new Fields("id"), 10, 1000);
-
-        TridentTopology topology = new TridentTopology();
-        Stream wordsStream = topology.newStream("numgen-spout", spout).
-                each(new Fields("id"), new Debug("##### ids"));
-
-        wordsStream.minBy("id").
-                each(new Fields("id"), new Debug("#### min-id"));
-
-        wordsStream.maxBy("id").
-                each(new Fields("id"), new Debug("#### max-id"));
-
-        return topology.build();
-    }
-
-    public static StormTopology buildWordsTopology() {
-        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
-                new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
-                new Values("how many apples can you eat"), new Values("to be or not to be the person"));
-        spout.setCycle(true);
-
-        TridentTopology topology = new TridentTopology();
-        Stream wordsStream = topology.newStream("spout1", spout).parallelismHint(16).
-                each(new Fields("sentence"), new Split(), new Fields("word")).
-                each(new Fields("word"), new Debug("##### words"));
-
-        wordsStream.minBy("word").
-                each(new Fields("word"), new Debug("#### lowest word"));
-
-        wordsStream.maxBy("word").
-                each(new Fields("word"), new Debug("#### highest word"));
-
-        return topology.build();
-    }
-
-    public static StormTopology buildVehiclesTopology() {
-
-        FixedBatchSpout spout = new FixedBatchSpout(new Fields("vehicle", "driver"), 10, Vehicle.generateVehicles(20));
-        spout.setCycle(true);
-
-        TridentTopology topology = new TridentTopology();
-        Stream vehiclesStream = topology.newStream("spout1", spout).
-                each(new Fields("vehicle"), new Debug("##### vehicles"));
-
-        vehiclesStream.min(new SpeedComparator())
-                .each(new Fields("vehicle"), new Debug("#### slowest vehicle"))
-                .project(new Fields("driver")). each(new Fields("driver"), new Debug("##### slowest driver"));
-
-        vehiclesStream.max(new SpeedComparator())
-                .each(new Fields("vehicle"), new Debug("#### fastest vehicle"))
-                .project(new Fields("driver")). each(new Fields("driver"), new Debug("##### fastest driver"));
-
-        vehiclesStream.max(new EfficiencyComparator()).
-                each(new Fields("vehicle"), new Debug("#### efficient vehicle"));
-
-        return topology.build();
-    }
-
-    public static void main(String[] args) throws Exception {
-        Config conf = new Config();
-        conf.setMaxSpoutPending(20);
-        StormTopology[] topologies = {buildWordsTopology(), buildIdsTopology(), buildVehiclesTopology()};
-        if (args.length == 0) {
-            for (StormTopology topology : topologies) {
-                LocalCluster cluster = new LocalCluster();
-                cluster.submitTopology("min-max-topology", conf, topology);
-                Utils.sleep(60*1000);
-                cluster.shutdown();
-            }
-            System.exit(0);
-        } else {
-            conf.setNumWorkers(3);
-            int ct=1;
-            for (StormTopology topology : topologies) {
-                StormSubmitter.submitTopologyWithProgressBar(args[0]+"-"+ct++, conf, topology);
-            }
-        }
-    }
-
-    static class SpeedComparator implements Comparator<TridentTuple>, Serializable {
-
-        @Override
-        public int compare(TridentTuple tuple1, TridentTuple tuple2) {
-            Vehicle vehicle1 = (Vehicle) tuple1.getValueByField("vehicle");
-            Vehicle vehicle2 = (Vehicle) tuple2.getValueByField("vehicle");
-            return Integer.compare(vehicle1.maxSpeed, vehicle2.maxSpeed);
-        }
-    }
-
-    static class EfficiencyComparator implements Comparator<TridentTuple>, Serializable {
-
-        @Override
-        public int compare(TridentTuple tuple1, TridentTuple tuple2) {
-            Vehicle vehicle1 = (Vehicle) tuple1.getValueByField("vehicle");
-            Vehicle vehicle2 = (Vehicle) tuple2.getValueByField("vehicle");
-            return Double.compare(vehicle1.efficiency, vehicle2.efficiency);
-        }
-
-    }
-
-    static class Driver implements Serializable {
-        final String name;
-        final int id;
-
-        Driver(String name, int id) {
-            this.name = name;
-            this.id = id;
-        }
-
-        @Override
-        public String toString() {
-            return "Driver{" +
-                    "name='" + name + '\'' +
-                    ", id=" + id +
-                    '}';
-        }
-    }
-
-    static class Vehicle implements Serializable {
-        final String name;
-        final int maxSpeed;
-        final double efficiency;
-
-        public Vehicle(String name, int maxSpeed, double efficiency) {
-            this.name = name;
-            this.maxSpeed = maxSpeed;
-            this.efficiency = efficiency;
-        }
-
-        @Override
-        public String toString() {
-            return "Vehicle{" +
-                    "name='" + name + '\'' +
-                    ", maxSpeed=" + maxSpeed +
-                    ", efficiency=" + efficiency +
-                    '}';
-        }
-
-        public static List<Object>[] generateVehicles(int count) {
-            List<Object>[] vehicles = new List[count];
-            for(int i=0; i<count; i++) {
-                int id = i-1;
-                vehicles[i] =
-                        (new Values(
-                                new Vehicle("Vehicle-"+id, ThreadLocalRandom.current().nextInt(0, 100), ThreadLocalRandom.current().nextDouble(1, 5)),
-                                new Driver("Driver-"+id, id)
-                        ));
-            }
-            return vehicles;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index fa62b72..d313678 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -448,10 +448,11 @@ public class Stream implements IAggregatableStream {
 
     /**
      * This aggregator operation computes the minimum of tuples by the given {@code inputFieldName} and it is
-     * assumed that its value is an instance of {@code Comparable}.
+     * assumed that its value is an instance of {@code Comparable}. If the value of tuple with field {@code inputFieldName} is not an
+     * instance of {@code Comparable} then it throws {@code ClassCastException}
      *
      * @param inputFieldName input field name
-     * @return
+     * @return the new stream with this operation.
      */
     public Stream minBy(String inputFieldName) {
         Aggregator<ComparisonAggregator.State> min = new Min(inputFieldName);
@@ -460,12 +461,13 @@ public class Stream implements IAggregatableStream {
 
     /**
      * This aggregator operation computes the minimum of tuples by the given {@code inputFieldName} in a stream by
-     * using the given {@code comparator}.
+     * using the given {@code comparator}. If the value of tuple with field {@code inputFieldName} is not an
+     * instance of {@code T} then it throws {@code ClassCastException}
      *
      * @param inputFieldName input field name
      * @param comparator comparator used in for finding minimum of two tuple values of {@code inputFieldName}.
      * @param <T> type of tuple's given input field value.
-     * @return
+     * @return the new stream with this operation.
      */
     public <T> Stream minBy(String inputFieldName, Comparator<T> comparator) {
         Aggregator<ComparisonAggregator.State> min = new MinWithComparator<>(inputFieldName, comparator);
@@ -477,7 +479,7 @@ public class Stream implements IAggregatableStream {
      * {@code TridentTuple}s.
      *
      * @param comparator comparator used in for finding minimum of two tuple values.
-     * @return
+     * @return the new stream with this operation.
      */
     public Stream min(Comparator<TridentTuple> comparator) {
         Aggregator<ComparisonAggregator.State> min = new MinWithComparator<>(comparator);
@@ -486,10 +488,11 @@ public class Stream implements IAggregatableStream {
 
     /**
      * This aggregator operation computes the maximum of tuples by the given {@code inputFieldName} and it is
-     * assumed that its value is an instance of {@code Comparable}.
+     * assumed that its value is an instance of {@code Comparable}. If the value of tuple with field {@code inputFieldName} is not an
+     * instance of {@code Comparable} then it throws {@code ClassCastException}
      *
      * @param inputFieldName input field name
-     * @return
+     * @return the new stream with this operation.
      */
     public Stream maxBy(String inputFieldName) {
         Aggregator<ComparisonAggregator.State> max = new Max(inputFieldName);
@@ -498,12 +501,13 @@ public class Stream implements IAggregatableStream {
 
     /**
      * This aggregator operation computes the maximum of tuples by the given {@code inputFieldName} in a stream by
-     * using the given {@code comparator}.
+     * using the given {@code comparator}. If the value of tuple with field {@code inputFieldName} is not an
+     * instance of {@code T} then it throws {@code ClassCastException}
      *
      * @param inputFieldName input field name
      * @param comparator comparator used in for finding maximum of two tuple values of {@code inputFieldName}.
      * @param <T> type of tuple's given input field value.
-     * @return
+     * @return the new stream with this operation.
      */
     public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator) {
         Aggregator<ComparisonAggregator.State> max = new MaxWithComparator<>(inputFieldName, comparator);
@@ -515,7 +519,7 @@ public class Stream implements IAggregatableStream {
      * {@code TridentTuple}s.
      *
      * @param comparator comparator used in for finding maximum of two tuple values.
-     * @return
+     * @return the new stream with this operation.
      */
     public Stream max(Comparator<TridentTuple> comparator) {
         Aggregator<ComparisonAggregator.State> max = new MaxWithComparator<>(comparator);

http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java
index 0109bb5..82b657a 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java
@@ -21,12 +21,16 @@ package org.apache.storm.trident.operation.builtin;
 import org.apache.storm.trident.operation.BaseAggregator;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Abstract {@code Aggregator} for comparing two values in a stream.
  *
  */
 public abstract class ComparisonAggregator<T> extends BaseAggregator<ComparisonAggregator.State> {
+    private static final Logger log = LoggerFactory.getLogger(ComparisonAggregator.class);
+    private Object batchId;
 
     public static class State {
         TridentTuple previousTuple;
@@ -42,6 +46,8 @@ public abstract class ComparisonAggregator<T> extends BaseAggregator<ComparisonA
 
     @Override
     public State init(Object batchId, TridentCollector collector) {
+        this.batchId = batchId;
+        log.debug("Started comparison aggregation for batch: [{}] in operation [{}]", batchId, this);
         return new State();
     }
 
@@ -50,6 +56,8 @@ public abstract class ComparisonAggregator<T> extends BaseAggregator<ComparisonA
         T value1 = valueFromTuple(state.previousTuple);
         T value2 = valueFromTuple(tuple);
 
+        log.debug("Aggregated tuple value in state [{}], and received tuple value [{}] in operation [{}]", value1, value2, this);
+
         if(value2 == null) {
             return;
         }
@@ -62,11 +70,22 @@ public abstract class ComparisonAggregator<T> extends BaseAggregator<ComparisonA
 
     protected T valueFromTuple(TridentTuple tuple) {
         // when there is no input field then the whole tuple is considered for comparison.
-        return (T) (inputFieldName != null && tuple != null ? tuple.getValueByField(inputFieldName) : tuple);
+        Object value = null;
+        if (inputFieldName != null && tuple != null) {
+            value =  tuple.getValueByField(inputFieldName);
+        } else {
+            value = tuple;
+        }
+
+        log.debug("value from tuple is [{}] with input field [{}] and tuple [{}]", value, inputFieldName, tuple);
+
+        return (T) value;
     }
 
     @Override
     public void complete(State state, TridentCollector collector) {
-        collector.emit(state.previousTuple.getValues());
+        log.debug("Completed comparison aggregation for batch [{}] with resultant tuple: [{}] in operation [{}]", batchId, state.previousTuple, this);
+
+        collector.emit(state.previousTuple != null ? state.previousTuple.getValues() : null);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java
index 5385dfb..f1221b0 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java
@@ -34,10 +34,4 @@ public class Max extends ComparisonAggregator<Comparable<Object>> {
         return value1.compareTo(value2) > 0 ? value1 : value2;
     }
 
-    /**
-     * Returns an aggregator computes the maximum of aggregated tuples in a stream. It assumes that the tuple has one value and
-     * it is an instance of {@code Comparable}.
-     *
-     * @return
-     */
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java
index 172aa58..0e8ae90 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java
@@ -41,4 +41,11 @@ public class MaxWithComparator<T> extends ComparisonAggregator<T> {
     protected T compare(T value1, T value2) {
         return comparator.compare(value1, value2) > 0 ? value1 : value2;
     }
+
+    @Override
+    public String toString() {
+        return "MaxWithComparator{" +
+                "comparator=" + comparator +
+                '}';
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java
index 0757d7c..010a919 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java
@@ -33,12 +33,4 @@ public class Min extends ComparisonAggregator<Comparable<Object>> {
     protected Comparable<Object> compare(Comparable<Object> value1, Comparable<Object> value2) {
         return value1.compareTo(value2) < 0 ? value1 : value2;
     }
-
-    /**
-     * Returns an aggregator computes the maximum of aggregated tuples in a stream. It assumes that the tuple has one value and
-     * it is an instance of {@code Comparable}.
-     *
-     * @return
-     * @param inputFieldName
-     */
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java
index d33e000..64144cb 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java
@@ -41,4 +41,11 @@ public class MinWithComparator<T> extends ComparisonAggregator<T> {
     protected T compare(T value1, T value2) {
         return comparator.compare(value1, value2) < 0 ? value1 : value2;
     }
+
+    @Override
+    public String toString() {
+        return "MinWithComparator{" +
+                "comparator=" + comparator +
+                '}';
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java b/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java
deleted file mode 100644
index a4a9a79..0000000
--- a/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.trident.testing;
-
-import org.apache.storm.Config;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IBatchSpout;
-import org.apache.storm.tuple.Fields;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-
-/**
- *
- */
-public class NumberGeneratorSpout implements IBatchSpout {
-    private final Fields fields;
-    private final int batchSize;
-    private final int maxNumber;
-    private final Map<Long, List<List<Object>>> batches = new HashMap<>();
-
-    public NumberGeneratorSpout(Fields fields, int batchSize, int maxNumber) {
-        this.fields = fields;
-        this.batchSize = batchSize;
-        this.maxNumber = maxNumber;
-    }
-
-    @Override
-    public void open(Map conf, TopologyContext context) {
-    }
-
-    @Override
-    public void emitBatch(long batchId, TridentCollector collector) {
-        List<List<Object>> values = null;
-        if(batches.containsKey(batchId)) {
-            values = batches.get(batchId);
-        } else {
-            values = new ArrayList<>();
-            for (int i = 0; i < batchSize; i++) {
-                values.add(Collections.singletonList((Object) ThreadLocalRandom.current().nextInt(0, maxNumber + 1)));
-            }
-            batches.put(batchId, values);
-        }
-        for (List<Object> value : values) {
-            collector.emit(value);
-        }
-    }
-
-    @Override
-    public void ack(long batchId) {
-        batches.remove(batchId);
-    }
-
-    @Override
-    public void close() {
-
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        Config conf = new Config();
-        conf.setMaxTaskParallelism(1);
-        return conf;
-    }
-
-    @Override
-    public Fields getOutputFields() {
-        return fields;
-    }
-}