You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/02/17 03:40:04 UTC
[1/4] storm git commit: min/max operators implementation in Trident
streams API.
Repository: storm
Updated Branches:
refs/heads/master 82bea75c9 -> 46999909a
min/max operators implementation in Trident streams API.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5295a909
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5295a909
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5295a909
Branch: refs/heads/master
Commit: 5295a909b1fc8c136a220a0e636c0d70c036e5c5
Parents: 31b57e8
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Fri Jan 29 12:39:26 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Mon Feb 8 11:14:41 2016 +0530
----------------------------------------------------------------------
.../TridentMinMaxOperationsTopology.java | 208 +++++++++++++++++++
.../jvm/org/apache/storm/trident/Stream.java | 117 +++++++++--
.../operation/builtin/ComparisonAggregator.java | 72 +++++++
.../storm/trident/operation/builtin/Max.java | 43 ++++
.../operation/builtin/MaxWithComparator.java | 44 ++++
.../storm/trident/operation/builtin/Min.java | 44 ++++
.../operation/builtin/MinWithComparator.java | 44 ++++
.../trident/testing/NumberGeneratorSpout.java | 92 ++++++++
8 files changed, 651 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java
new file mode 100644
index 0000000..dedaaff
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.testing.NumberGeneratorSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * This class contains different usages of minBy, maxBy, min and max operations on trident streams.
+ *
+ */
+public class TridentMinMaxOperationsTopology {
+ public static class Split extends BaseFunction {
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ String sentence = tuple.getString(0);
+ for (String word : sentence.split(" ")) {
+ collector.emit(new Values(word));
+ }
+ }
+ }
+
+ public static StormTopology buildIdsTopology() {
+ NumberGeneratorSpout spout = new NumberGeneratorSpout(new Fields("id"), 10, 1000);
+
+ TridentTopology topology = new TridentTopology();
+ Stream wordsStream = topology.newStream("numgen-spout", spout).
+ each(new Fields("id"), new Debug("##### ids"));
+
+ wordsStream.minBy("id").
+ each(new Fields("id"), new Debug("#### min-id"));
+
+ wordsStream.maxBy("id").
+ each(new Fields("id"), new Debug("#### max-id"));
+
+ return topology.build();
+ }
+
+ public static StormTopology buildWordsTopology() {
+ FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
+ new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
+ new Values("how many apples can you eat"), new Values("to be or not to be the person"));
+ spout.setCycle(true);
+
+ TridentTopology topology = new TridentTopology();
+ Stream wordsStream = topology.newStream("spout1", spout).parallelismHint(16).
+ each(new Fields("sentence"), new Split(), new Fields("word")).
+ each(new Fields("word"), new Debug("##### words"));
+
+ wordsStream.minBy("word").
+ each(new Fields("word"), new Debug("#### lowest word"));
+
+ wordsStream.maxBy("word").
+ each(new Fields("word"), new Debug("#### highest word"));
+
+ return topology.build();
+ }
+
+ public static StormTopology buildVehiclesTopology() {
+
+ FixedBatchSpout spout = new FixedBatchSpout(new Fields("vehicle", "driver"), 10, Vehicle.generateVehicles(20));
+ spout.setCycle(true);
+
+ TridentTopology topology = new TridentTopology();
+ Stream vehiclesStream = topology.newStream("spout1", spout).
+ each(new Fields("vehicle"), new Debug("##### vehicles"));
+
+ vehiclesStream.min(new SpeedComparator())
+ .each(new Fields("vehicle"), new Debug("#### slowest vehicle"))
+ .project(new Fields("driver")). each(new Fields("driver"), new Debug("##### slowest driver"));
+
+ vehiclesStream.max(new SpeedComparator())
+ .each(new Fields("vehicle"), new Debug("#### fastest vehicle"))
+ .project(new Fields("driver")). each(new Fields("driver"), new Debug("##### fastest driver"));
+
+ vehiclesStream.max(new EfficiencyComparator()).
+ each(new Fields("vehicle"), new Debug("#### efficient vehicle"));
+
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+ Config conf = new Config();
+ conf.setMaxSpoutPending(20);
+ StormTopology[] topologies = {buildWordsTopology(), buildIdsTopology(), buildVehiclesTopology()};
+ if (args.length == 0) {
+ for (StormTopology topology : topologies) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("min-max-topology", conf, topology);
+ Utils.sleep(60*1000);
+ cluster.shutdown();
+ }
+ System.exit(0);
+ } else {
+ conf.setNumWorkers(3);
+ int ct=1;
+ for (StormTopology topology : topologies) {
+ StormSubmitter.submitTopologyWithProgressBar(args[0]+"-"+ct++, conf, topology);
+ }
+ }
+ }
+
+ static class SpeedComparator implements Comparator<TridentTuple>, Serializable {
+
+ @Override
+ public int compare(TridentTuple tuple1, TridentTuple tuple2) {
+ Vehicle vehicle1 = (Vehicle) tuple1.getValueByField("vehicle");
+ Vehicle vehicle2 = (Vehicle) tuple2.getValueByField("vehicle");
+ return Integer.compare(vehicle1.maxSpeed, vehicle2.maxSpeed);
+ }
+ }
+
+ static class EfficiencyComparator implements Comparator<TridentTuple>, Serializable {
+
+ @Override
+ public int compare(TridentTuple tuple1, TridentTuple tuple2) {
+ Vehicle vehicle1 = (Vehicle) tuple1.getValueByField("vehicle");
+ Vehicle vehicle2 = (Vehicle) tuple2.getValueByField("vehicle");
+ return Double.compare(vehicle1.efficiency, vehicle2.efficiency);
+ }
+
+ }
+
+ static class Driver implements Serializable {
+ final String name;
+ final int id;
+
+ Driver(String name, int id) {
+ this.name = name;
+ this.id = id;
+ }
+
+ @Override
+ public String toString() {
+ return "Driver{" +
+ "name='" + name + '\'' +
+ ", id=" + id +
+ '}';
+ }
+ }
+
+ static class Vehicle implements Serializable {
+ final String name;
+ final int maxSpeed;
+ final double efficiency;
+
+ public Vehicle(String name, int maxSpeed, double efficiency) {
+ this.name = name;
+ this.maxSpeed = maxSpeed;
+ this.efficiency = efficiency;
+ }
+
+ @Override
+ public String toString() {
+ return "Vehicle{" +
+ "name='" + name + '\'' +
+ ", maxSpeed=" + maxSpeed +
+ ", efficiency=" + efficiency +
+ '}';
+ }
+
+ public static List<Object>[] generateVehicles(int count) {
+ List<Object>[] vehicles = new List[count];
+ for(int i=0; i<count; i++) {
+ int id = i-1;
+ vehicles[i] =
+ (new Values(
+ new Vehicle("Vehicle-"+id, ThreadLocalRandom.current().nextInt(0, 100), ThreadLocalRandom.current().nextDouble(1, 5)),
+ new Driver("Driver-"+id, id)
+ ));
+ }
+ return vehicles;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index 7c6d93f..fa62b72 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -19,31 +19,33 @@ package org.apache.storm.trident;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.NullStruct;
-import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer;
import org.apache.storm.grouping.CustomStreamGrouping;
-import org.apache.storm.trident.operation.Consumer;
-import org.apache.storm.trident.operation.FlatMapFunction;
-import org.apache.storm.trident.operation.MapFunction;
-import org.apache.storm.trident.operation.impl.ConsumerExecutor;
-import org.apache.storm.trident.operation.impl.FlatMapFunctionExecutor;
-import org.apache.storm.trident.operation.impl.MapFunctionExecutor;
-import org.apache.storm.trident.planner.processor.MapProcessor;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
+import org.apache.storm.trident.fluent.ChainedAggregatorDeclarer;
import org.apache.storm.trident.fluent.GlobalAggregationScheme;
import org.apache.storm.trident.fluent.GroupedStream;
import org.apache.storm.trident.fluent.IAggregatableStream;
import org.apache.storm.trident.operation.Aggregator;
import org.apache.storm.trident.operation.Assembly;
import org.apache.storm.trident.operation.CombinerAggregator;
+import org.apache.storm.trident.operation.Consumer;
import org.apache.storm.trident.operation.Filter;
+import org.apache.storm.trident.operation.FlatMapFunction;
import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.MapFunction;
import org.apache.storm.trident.operation.ReducerAggregator;
+import org.apache.storm.trident.operation.builtin.ComparisonAggregator;
+import org.apache.storm.trident.operation.builtin.Max;
+import org.apache.storm.trident.operation.builtin.MaxWithComparator;
+import org.apache.storm.trident.operation.builtin.Min;
+import org.apache.storm.trident.operation.builtin.MinWithComparator;
import org.apache.storm.trident.operation.impl.CombinerAggStateUpdater;
+import org.apache.storm.trident.operation.impl.ConsumerExecutor;
import org.apache.storm.trident.operation.impl.FilterExecutor;
+import org.apache.storm.trident.operation.impl.FlatMapFunctionExecutor;
import org.apache.storm.trident.operation.impl.GlobalBatchToPartition;
-import org.apache.storm.trident.operation.impl.ReducerAggStateUpdater;
import org.apache.storm.trident.operation.impl.IndexHashBatchToPartition;
+import org.apache.storm.trident.operation.impl.MapFunctionExecutor;
+import org.apache.storm.trident.operation.impl.ReducerAggStateUpdater;
import org.apache.storm.trident.operation.impl.SingleEmitAggregator.BatchToPartition;
import org.apache.storm.trident.operation.impl.TrueFilter;
import org.apache.storm.trident.partition.GlobalGrouping;
@@ -55,6 +57,7 @@ import org.apache.storm.trident.planner.PartitionNode;
import org.apache.storm.trident.planner.ProcessorNode;
import org.apache.storm.trident.planner.processor.AggregateProcessor;
import org.apache.storm.trident.planner.processor.EachProcessor;
+import org.apache.storm.trident.planner.processor.MapProcessor;
import org.apache.storm.trident.planner.processor.PartitionPersistProcessor;
import org.apache.storm.trident.planner.processor.ProjectedProcessor;
import org.apache.storm.trident.planner.processor.StateQueryProcessor;
@@ -62,7 +65,12 @@ import org.apache.storm.trident.state.QueryFunction;
import org.apache.storm.trident.state.StateFactory;
import org.apache.storm.trident.state.StateSpec;
import org.apache.storm.trident.state.StateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.trident.util.TridentUtils;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.util.Comparator;
/**
* A Stream represents the core data model in Trident, and can be thought of as a "stream" of tuples that are processed
@@ -436,8 +444,91 @@ public class Stream implements IAggregatableStream {
return chainedAgg()
.partitionAggregate(inputFields, agg, functionFields)
.chainEnd();
- }
-
+ }
+
+ /**
+ * This aggregator operation computes the minimum of tuples by the given {@code inputFieldName} and it is
+ * assumed that its value is an instance of {@code Comparable}.
+ *
+ * @param inputFieldName input field name
+ * @return
+ */
+ public Stream minBy(String inputFieldName) {
+ Aggregator<ComparisonAggregator.State> min = new Min(inputFieldName);
+ return comparableAggregateStream(inputFieldName, min);
+ }
+
+ /**
+ * This aggregator operation computes the minimum of tuples by the given {@code inputFieldName} in a stream by
+ * using the given {@code comparator}.
+ *
+ * @param inputFieldName input field name
+ * @param comparator comparator used in for finding minimum of two tuple values of {@code inputFieldName}.
+ * @param <T> type of tuple's given input field value.
+ * @return
+ */
+ public <T> Stream minBy(String inputFieldName, Comparator<T> comparator) {
+ Aggregator<ComparisonAggregator.State> min = new MinWithComparator<>(inputFieldName, comparator);
+ return comparableAggregateStream(inputFieldName, min);
+ }
+
+ /**
+ * This aggregator operation computes the minimum of tuples in a stream by using the given {@code comparator} with
+ * {@code TridentTuple}s.
+ *
+ * @param comparator comparator used in for finding minimum of two tuple values.
+ * @return
+ */
+ public Stream min(Comparator<TridentTuple> comparator) {
+ Aggregator<ComparisonAggregator.State> min = new MinWithComparator<>(comparator);
+ return comparableAggregateStream(null, min);
+ }
+
+ /**
+ * This aggregator operation computes the maximum of tuples by the given {@code inputFieldName} and it is
+ * assumed that its value is an instance of {@code Comparable}.
+ *
+ * @param inputFieldName input field name
+ * @return
+ */
+ public Stream maxBy(String inputFieldName) {
+ Aggregator<ComparisonAggregator.State> max = new Max(inputFieldName);
+ return comparableAggregateStream(inputFieldName, max);
+ }
+
+ /**
+ * This aggregator operation computes the maximum of tuples by the given {@code inputFieldName} in a stream by
+ * using the given {@code comparator}.
+ *
+ * @param inputFieldName input field name
+ * @param comparator comparator used in for finding maximum of two tuple values of {@code inputFieldName}.
+ * @param <T> type of tuple's given input field value.
+ * @return
+ */
+ public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator) {
+ Aggregator<ComparisonAggregator.State> max = new MaxWithComparator<>(inputFieldName, comparator);
+ return comparableAggregateStream(inputFieldName, max);
+ }
+
+ /**
+ * This aggregator operation computes the maximum of tuples in a stream by using the given {@code comparator} with
+ * {@code TridentTuple}s.
+ *
+ * @param comparator comparator used in for finding maximum of two tuple values.
+ * @return
+ */
+ public Stream max(Comparator<TridentTuple> comparator) {
+ Aggregator<ComparisonAggregator.State> max = new MaxWithComparator<>(comparator);
+ return comparableAggregateStream(null, max);
+ }
+
+ private <T> Stream comparableAggregateStream(String inputFieldName, Aggregator<T> aggregator) {
+ if(inputFieldName != null) {
+ projectionValidation(new Fields(inputFieldName));
+ }
+ return partitionAggregate(getOutputFields(), aggregator, getOutputFields());
+ }
+
public Stream aggregate(Aggregator agg, Fields functionFields) {
return aggregate(null, agg, functionFields);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java
new file mode 100644
index 0000000..0109bb5
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.operation.builtin;
+
+import org.apache.storm.trident.operation.BaseAggregator;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+/**
+ * Abstract {@code Aggregator} for comparing two values in a stream.
+ *
+ */
+public abstract class ComparisonAggregator<T> extends BaseAggregator<ComparisonAggregator.State> {
+
+ public static class State {
+ TridentTuple previousTuple;
+ }
+
+ private final String inputFieldName;
+
+ public ComparisonAggregator(String inputFieldName) {
+ this.inputFieldName = inputFieldName;
+ }
+
+ protected abstract T compare(T value1, T value2);
+
+ @Override
+ public State init(Object batchId, TridentCollector collector) {
+ return new State();
+ }
+
+ @Override
+ public void aggregate(State state, TridentTuple tuple, TridentCollector collector) {
+ T value1 = valueFromTuple(state.previousTuple);
+ T value2 = valueFromTuple(tuple);
+
+ if(value2 == null) {
+ return;
+ }
+
+ if(value1 == null || compare(value1, value2) == value2) {
+ state.previousTuple = tuple;
+ }
+
+ }
+
+ protected T valueFromTuple(TridentTuple tuple) {
+ // when there is no input field then the whole tuple is considered for comparison.
+ return (T) (inputFieldName != null && tuple != null ? tuple.getValueByField(inputFieldName) : tuple);
+ }
+
+ @Override
+ public void complete(State state, TridentCollector collector) {
+ collector.emit(state.previousTuple.getValues());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java
new file mode 100644
index 0000000..5385dfb
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.operation.builtin;
+
+/**
+ * This aggregator computes the maximum of aggregated tuples in a stream. It assumes that the tuple has one value and
+ * it is an instance of {@code Comparable}.
+ *
+ */
+public class Max extends ComparisonAggregator<Comparable<Object>> {
+
+ public Max(String inputFieldName) {
+ super(inputFieldName);
+ }
+
+ @Override
+ protected Comparable<Object> compare(Comparable<Object> value1, Comparable<Object> value2) {
+ return value1.compareTo(value2) > 0 ? value1 : value2;
+ }
+
+ /**
+ * Returns an aggregator computes the maximum of aggregated tuples in a stream. It assumes that the tuple has one value and
+ * it is an instance of {@code Comparable}.
+ *
+ * @return
+ */
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java
new file mode 100644
index 0000000..172aa58
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.operation.builtin;
+
+import java.util.Comparator;
+
+/**
+ * This aggregator computes the maximum of aggregated tuples in a stream. It uses given {@code comparator} for comparing
+ * two values in a stream.
+ *
+ */
+public class MaxWithComparator<T> extends ComparisonAggregator<T> {
+ private final Comparator<T> comparator;
+
+ public MaxWithComparator(Comparator<T> comparator) {
+ this(null, comparator);
+ }
+
+ public MaxWithComparator(String inputFieldName, Comparator<T> comparator) {
+ super(inputFieldName);
+ this.comparator = comparator;
+ }
+
+ @Override
+ protected T compare(T value1, T value2) {
+ return comparator.compare(value1, value2) > 0 ? value1 : value2;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java
new file mode 100644
index 0000000..0757d7c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.operation.builtin;
+
+/**
+ * This aggregator computes the minimum of aggregated tuples in a stream. It assumes that the tuple has one value and
+ * it is an instance of {@code Comparable}.
+ *
+ */
+public class Min extends ComparisonAggregator<Comparable<Object>> {
+
+ public Min(String inputFieldName) {
+ super(inputFieldName);
+ }
+
+ @Override
+ protected Comparable<Object> compare(Comparable<Object> value1, Comparable<Object> value2) {
+ return value1.compareTo(value2) < 0 ? value1 : value2;
+ }
+
+ /**
+ * Returns an aggregator computes the maximum of aggregated tuples in a stream. It assumes that the tuple has one value and
+ * it is an instance of {@code Comparable}.
+ *
+ * @return
+ * @param inputFieldName
+ */
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java
new file mode 100644
index 0000000..d33e000
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.operation.builtin;
+
+import java.util.Comparator;
+
+/**
+ * This aggregator computes the minimum of aggregated tuples in a stream. It uses given @{code comparator} for comparing
+ * two values in a stream.
+ *
+ */
+public class MinWithComparator<T> extends ComparisonAggregator<T> {
+ private final Comparator<T> comparator;
+
+ public MinWithComparator(String inputFieldName, Comparator<T> comparator) {
+ super(inputFieldName);
+ this.comparator = comparator;
+ }
+
+ public MinWithComparator(Comparator<T> comparator) {
+ this(null, comparator);
+ }
+
+ @Override
+ protected T compare(T value1, T value2) {
+ return comparator.compare(value1, value2) < 0 ? value1 : value2;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/5295a909/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java b/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java
new file mode 100644
index 0000000..a4a9a79
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.trident.testing;
+
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IBatchSpout;
+import org.apache.storm.tuple.Fields;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ *
+ */
+public class NumberGeneratorSpout implements IBatchSpout {
+ private final Fields fields;
+ private final int batchSize;
+ private final int maxNumber;
+ private final Map<Long, List<List<Object>>> batches = new HashMap<>();
+
+ public NumberGeneratorSpout(Fields fields, int batchSize, int maxNumber) {
+ this.fields = fields;
+ this.batchSize = batchSize;
+ this.maxNumber = maxNumber;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context) {
+ }
+
+ @Override
+ public void emitBatch(long batchId, TridentCollector collector) {
+ List<List<Object>> values = null;
+ if(batches.containsKey(batchId)) {
+ values = batches.get(batchId);
+ } else {
+ values = new ArrayList<>();
+ for (int i = 0; i < batchSize; i++) {
+ values.add(Collections.singletonList((Object) ThreadLocalRandom.current().nextInt(0, maxNumber + 1)));
+ }
+ batches.put(batchId, values);
+ }
+ for (List<Object> value : values) {
+ collector.emit(value);
+ }
+ }
+
+ @Override
+ public void ack(long batchId) {
+ batches.remove(batchId);
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Config conf = new Config();
+ conf.setMaxTaskParallelism(1);
+ return conf;
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return fields;
+ }
+}
[4/4] storm git commit: Added STORM-1511 to CHANGELOG.
Posted by sr...@apache.org.
Added STORM-1511 to CHANGELOG.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/46999909
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/46999909
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/46999909
Branch: refs/heads/master
Commit: 46999909a7c2f1a8b3e436f314971e9a21467547
Parents: 68c87b5
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Tue Feb 16 18:39:13 2016 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Feb 16 18:39:13 2016 -0800
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/46999909/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 36aa9d5..f50d05f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 2.0.0
+ * STORM-1511: min/max operators implementation in Trident streams API.
* STROM-1263: port backtype.storm.command.kill-topology to java
* STORM-1260: port backtype.storm.command.activate to java
* STORM-1261: port backtype.storm.command.deactivate to java
[3/4] storm git commit: Merge branch 'min-max' of
http://github.com/satishd/storm into STORM-1511
Posted by sr...@apache.org.
Merge branch 'min-max' of http://github.com/satishd/storm into STORM-1511
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/68c87b5e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/68c87b5e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/68c87b5e
Branch: refs/heads/master
Commit: 68c87b5eb2d254586fabfc81f5ed7addbb9fdc68
Parents: 82bea75 bc263cb
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Tue Feb 16 18:35:03 2016 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Feb 16 18:35:03 2016 -0800
----------------------------------------------------------------------
.../spout/RandomNumberGeneratorSpout.java | 95 +++++++++
.../trident/TridentMinMaxOfDevicesTopology.java | 201 +++++++++++++++++++
.../TridentMinMaxOfVehiclesTopology.java | 180 +++++++++++++++++
.../jvm/org/apache/storm/trident/Stream.java | 121 +++++++++--
.../operation/builtin/ComparisonAggregator.java | 91 +++++++++
.../storm/trident/operation/builtin/Max.java | 37 ++++
.../operation/builtin/MaxWithComparator.java | 51 +++++
.../storm/trident/operation/builtin/Min.java | 36 ++++
.../operation/builtin/MinWithComparator.java | 51 +++++
9 files changed, 850 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
[2/4] storm git commit: Addressed review comments
Posted by sr...@apache.org.
Addressed review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bc263cba
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bc263cba
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bc263cba
Branch: refs/heads/master
Commit: bc263cba67283b0c1ebe95be49e137fad86b3978
Parents: 5295a90
Author: Satish Duggana <sd...@hortonworks.com>
Authored: Mon Feb 8 16:49:46 2016 +0530
Committer: Satish Duggana <sd...@hortonworks.com>
Committed: Tue Feb 9 10:31:18 2016 +0530
----------------------------------------------------------------------
.../spout/RandomNumberGeneratorSpout.java | 95 +++++++++
.../trident/TridentMinMaxOfDevicesTopology.java | 201 ++++++++++++++++++
.../TridentMinMaxOfVehiclesTopology.java | 180 ++++++++++++++++
.../TridentMinMaxOperationsTopology.java | 208 -------------------
.../jvm/org/apache/storm/trident/Stream.java | 24 ++-
.../operation/builtin/ComparisonAggregator.java | 23 +-
.../storm/trident/operation/builtin/Max.java | 6 -
.../operation/builtin/MaxWithComparator.java | 7 +
.../storm/trident/operation/builtin/Min.java | 8 -
.../operation/builtin/MinWithComparator.java | 7 +
.../trident/testing/NumberGeneratorSpout.java | 92 --------
11 files changed, 525 insertions(+), 326 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomNumberGeneratorSpout.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomNumberGeneratorSpout.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomNumberGeneratorSpout.java
new file mode 100644
index 0000000..1d1b082
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/spout/RandomNumberGeneratorSpout.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.starter.spout;
+
+import org.apache.storm.Config;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IBatchSpout;
+import org.apache.storm.tuple.Fields;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * This spout generates random whole numbers with given {@code maxNumber} value as maximum with the given {@code fields}.
+ *
+ */
+public class RandomNumberGeneratorSpout implements IBatchSpout {
+ private final Fields fields;
+ private final int batchSize;
+ private final int maxNumber;
+ private final Map<Long, List<List<Object>>> batches = new HashMap<>();
+
+ public RandomNumberGeneratorSpout(Fields fields, int batchSize, int maxNumber) {
+ this.fields = fields;
+ this.batchSize = batchSize;
+ this.maxNumber = maxNumber;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context) {
+ }
+
+ @Override
+ public void emitBatch(long batchId, TridentCollector collector) {
+ List<List<Object>> values = null;
+ if(batches.containsKey(batchId)) {
+ values = batches.get(batchId);
+ } else {
+ values = new ArrayList<>();
+ for (int i = 0; i < batchSize; i++) {
+ List<Object> numbers = new ArrayList<>();
+ for (int x=0; x<fields.size(); x++) {
+ numbers.add(ThreadLocalRandom.current().nextInt(0, maxNumber + 1));
+ }
+ values.add(numbers);
+ }
+ batches.put(batchId, values);
+ }
+ for (List<Object> value : values) {
+ collector.emit(value);
+ }
+ }
+
+ @Override
+ public void ack(long batchId) {
+ batches.remove(batchId);
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Config conf = new Config();
+ conf.setMaxTaskParallelism(1);
+ return conf;
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return fields;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
new file mode 100644
index 0000000..d985436
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfDevicesTopology.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.starter.spout.RandomNumberGeneratorSpout;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * This class demonstrates different usages of
+ * * {@link Stream#minBy(String)}
+ * * {@link Stream#maxBy(String)}
+ * operations on trident {@link Stream}.
+ */
+public class TridentMinMaxOfDevicesTopology {
+
+ /**
+ * Creates a topology with device-id and count (which are whole numbers) as tuple fields in a stream and it finally
+ * generates result stream based on min amd max with device-id and count values.
+ */
+ public static StormTopology buildDevicesTopology() {
+ String deviceID = "device-id";
+ String count = "count";
+ Fields allFields = new Fields(deviceID, count);
+
+ RandomNumberGeneratorSpout spout = new RandomNumberGeneratorSpout(allFields, 10, 1000);
+
+ TridentTopology topology = new TridentTopology();
+ Stream devicesStream = topology.newStream("devicegen-spout", spout).
+ each(allFields, new Debug("##### devices"));
+
+ devicesStream.minBy(deviceID).
+ each(allFields, new Debug("#### device with min id"));
+
+ devicesStream.maxBy(count).
+ each(allFields, new Debug("#### device with max count"));
+
+ return topology.build();
+ }
+
+ /**
+ * Creates a topology which demonstrates min/max operations on tuples of stream which contain vehicle and driver fields
+ * with values {@link TridentMinMaxOfDevicesTopology.Vehicle} and {@link TridentMinMaxOfDevicesTopology.Driver} respectively.
+ */
+ public static StormTopology buildVehiclesTopology() {
+ Fields driverField = new Fields(Driver.FIELD_NAME);
+ Fields vehicleField = new Fields(Vehicle.FIELD_NAME);
+ Fields allFields = new Fields(Vehicle.FIELD_NAME, Driver.FIELD_NAME);
+
+ FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, Vehicle.generateVehicles(20));
+ spout.setCycle(true);
+
+ TridentTopology topology = new TridentTopology();
+ Stream vehiclesStream = topology.newStream("spout1", spout).
+ each(allFields, new Debug("##### vehicles"));
+
+ Stream slowVehiclesStream =
+ vehiclesStream
+ .min(new SpeedComparator())
+ .each(vehicleField, new Debug("#### slowest vehicle"));
+
+ Stream slowDriversStream =
+ slowVehiclesStream
+ .project(driverField)
+ .each(driverField, new Debug("##### slowest driver"));
+
+ vehiclesStream
+ .max(new SpeedComparator())
+ .each(vehicleField, new Debug("#### fastest vehicle"))
+ .project(driverField)
+ .each(driverField, new Debug("##### fastest driver"));
+
+ vehiclesStream
+ .max(new EfficiencyComparator()).
+ each(vehicleField, new Debug("#### efficient vehicle"));
+
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ StormTopology topology = buildDevicesTopology();
+ Config conf = new Config();
+ conf.setMaxSpoutPending(20);
+ if (args.length == 0) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("devices-topology", conf, topology);
+ Utils.sleep(60 * 1000);
+ cluster.shutdown();
+ System.exit(0);
+ } else {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopologyWithProgressBar("devices-topology", conf, topology);
+ }
+ }
+
+ static class SpeedComparator implements Comparator<TridentTuple>, Serializable {
+
+ @Override
+ public int compare(TridentTuple tuple1, TridentTuple tuple2) {
+ Vehicle vehicle1 = (Vehicle) tuple1.getValueByField(Vehicle.FIELD_NAME);
+ Vehicle vehicle2 = (Vehicle) tuple2.getValueByField(Vehicle.FIELD_NAME);
+ return Integer.compare(vehicle1.maxSpeed, vehicle2.maxSpeed);
+ }
+ }
+
+ static class EfficiencyComparator implements Comparator<TridentTuple>, Serializable {
+
+ @Override
+ public int compare(TridentTuple tuple1, TridentTuple tuple2) {
+ Vehicle vehicle1 = (Vehicle) tuple1.getValueByField(Vehicle.FIELD_NAME);
+ Vehicle vehicle2 = (Vehicle) tuple2.getValueByField(Vehicle.FIELD_NAME);
+ return Double.compare(vehicle1.efficiency, vehicle2.efficiency);
+ }
+
+ }
+
+ static class Driver implements Serializable {
+ static final String FIELD_NAME = "driver";
+ final String name;
+ final int id;
+
+ Driver(String name, int id) {
+ this.name = name;
+ this.id = id;
+ }
+
+ @Override
+ public String toString() {
+ return "Driver{" +
+ "name='" + name + '\'' +
+ ", id=" + id +
+ '}';
+ }
+ }
+
+ static class Vehicle implements Serializable {
+ static final String FIELD_NAME = "vehicle";
+ final String name;
+ final int maxSpeed;
+ final double efficiency;
+
+ public Vehicle(String name, int maxSpeed, double efficiency) {
+ this.name = name;
+ this.maxSpeed = maxSpeed;
+ this.efficiency = efficiency;
+ }
+
+ @Override
+ public String toString() {
+ return "Vehicle{" +
+ "name='" + name + '\'' +
+ ", maxSpeed=" + maxSpeed +
+ ", efficiency=" + efficiency +
+ '}';
+ }
+
+ public static List<Object>[] generateVehicles(int count) {
+ List<Object>[] vehicles = new List[count];
+ for (int i = 0; i < count; i++) {
+ int id = i - 1;
+ vehicles[i] =
+ (new Values(
+ new Vehicle("Vehicle-" + id, ThreadLocalRandom.current().nextInt(0, 100), ThreadLocalRandom.current().nextDouble(1, 5)),
+ new Driver("Driver-" + id, id)
+ ));
+ }
+ return vehicles;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
new file mode 100644
index 0000000..192b198
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOfVehiclesTopology.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * This class demonstrates different usages of
+ * * {@link Stream#minBy(String, Comparator)}
+ * * {@link Stream#min(Comparator)}
+ * * {@link Stream#maxBy(String, Comparator)}
+ * * {@link Stream#max(Comparator)}
+ * operations on trident {@link Stream}.
+ */
+public class TridentMinMaxOfVehiclesTopology {
+
+ /**
+ * Creates a topology which demonstrates min/max operations on tuples of stream which contain vehicle and driver fields
+ * with values {@link TridentMinMaxOfVehiclesTopology.Vehicle} and {@link TridentMinMaxOfVehiclesTopology.Driver} respectively.
+ */
+ public static StormTopology buildVehiclesTopology() {
+ Fields driverField = new Fields(Driver.FIELD_NAME);
+ Fields vehicleField = new Fields(Vehicle.FIELD_NAME);
+ Fields allFields = new Fields(Vehicle.FIELD_NAME, Driver.FIELD_NAME);
+
+ FixedBatchSpout spout = new FixedBatchSpout(allFields, 10, Vehicle.generateVehicles(20));
+ spout.setCycle(true);
+
+ TridentTopology topology = new TridentTopology();
+ Stream vehiclesStream = topology.newStream("spout1", spout).
+ each(allFields, new Debug("##### vehicles"));
+
+ Stream slowVehiclesStream =
+ vehiclesStream
+ .min(new SpeedComparator())
+ .each(vehicleField, new Debug("#### slowest vehicle"));
+
+ Stream slowDriversStream =
+ slowVehiclesStream
+ .project(driverField)
+ .each(driverField, new Debug("##### slowest driver"));
+
+ vehiclesStream
+ .max(new SpeedComparator())
+ .each(vehicleField, new Debug("#### fastest vehicle"))
+ .project(driverField)
+ .each(driverField, new Debug("##### fastest driver"));
+
+ vehiclesStream
+ .minBy(Vehicle.FIELD_NAME, new EfficiencyComparator()).
+ each(vehicleField, new Debug("#### least efficient vehicle"));
+
+ vehiclesStream
+ .maxBy(Vehicle.FIELD_NAME, new EfficiencyComparator()).
+ each(vehicleField, new Debug("#### most efficient vehicle"));
+
+ return topology.build();
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ StormTopology topology = buildVehiclesTopology();
+ Config conf = new Config();
+ conf.setMaxSpoutPending(20);
+ if (args.length == 0) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("vehicles-topology", conf, topology);
+ Utils.sleep(60 * 1000);
+ cluster.shutdown();
+ System.exit(0);
+ } else {
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopologyWithProgressBar("vehicles-topology", conf, topology);
+ }
+ }
+
+ static class SpeedComparator implements Comparator<TridentTuple>, Serializable {
+
+ @Override
+ public int compare(TridentTuple tuple1, TridentTuple tuple2) {
+ Vehicle vehicle1 = (Vehicle) tuple1.getValueByField(Vehicle.FIELD_NAME);
+ Vehicle vehicle2 = (Vehicle) tuple2.getValueByField(Vehicle.FIELD_NAME);
+ return Integer.compare(vehicle1.maxSpeed, vehicle2.maxSpeed);
+ }
+ }
+
+ static class EfficiencyComparator implements Comparator<Vehicle>, Serializable {
+
+ @Override
+ public int compare(Vehicle vehicle1, Vehicle vehicle2) {
+ return Double.compare(vehicle1.efficiency, vehicle2.efficiency);
+ }
+
+ }
+
+ static class Driver implements Serializable {
+ static final String FIELD_NAME = "driver";
+ final String name;
+ final int id;
+
+ Driver(String name, int id) {
+ this.name = name;
+ this.id = id;
+ }
+
+ @Override
+ public String toString() {
+ return "Driver{" +
+ "name='" + name + '\'' +
+ ", id=" + id +
+ '}';
+ }
+ }
+
+ static class Vehicle implements Serializable {
+ static final String FIELD_NAME = "vehicle";
+ final String name;
+ final int maxSpeed;
+ final double efficiency;
+
+ public Vehicle(String name, int maxSpeed, double efficiency) {
+ this.name = name;
+ this.maxSpeed = maxSpeed;
+ this.efficiency = efficiency;
+ }
+
+ @Override
+ public String toString() {
+ return "Vehicle{" +
+ "name='" + name + '\'' +
+ ", maxSpeed=" + maxSpeed +
+ ", efficiency=" + efficiency +
+ '}';
+ }
+
+ public static List<Object>[] generateVehicles(int count) {
+ List<Object>[] vehicles = new List[count];
+ for (int i = 0; i < count; i++) {
+ int id = i - 1;
+ vehicles[i] =
+ (new Values(
+ new Vehicle("Vehicle-" + id, ThreadLocalRandom.current().nextInt(0, 100), ThreadLocalRandom.current().nextDouble(1, 5)),
+ new Driver("Driver-" + id, id)
+ ));
+ }
+ return vehicles;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java
deleted file mode 100644
index dedaaff..0000000
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentMinMaxOperationsTopology.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.starter.trident;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.trident.Stream;
-import org.apache.storm.trident.TridentTopology;
-import org.apache.storm.trident.operation.BaseFunction;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.operation.builtin.Debug;
-import org.apache.storm.trident.testing.FixedBatchSpout;
-import org.apache.storm.trident.testing.NumberGeneratorSpout;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-
-import java.io.Serializable;
-import java.util.Comparator;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-
-/**
- * This class contains different usages of minBy, maxBy, min and max operations on trident streams.
- *
- */
-public class TridentMinMaxOperationsTopology {
- public static class Split extends BaseFunction {
- @Override
- public void execute(TridentTuple tuple, TridentCollector collector) {
- String sentence = tuple.getString(0);
- for (String word : sentence.split(" ")) {
- collector.emit(new Values(word));
- }
- }
- }
-
- public static StormTopology buildIdsTopology() {
- NumberGeneratorSpout spout = new NumberGeneratorSpout(new Fields("id"), 10, 1000);
-
- TridentTopology topology = new TridentTopology();
- Stream wordsStream = topology.newStream("numgen-spout", spout).
- each(new Fields("id"), new Debug("##### ids"));
-
- wordsStream.minBy("id").
- each(new Fields("id"), new Debug("#### min-id"));
-
- wordsStream.maxBy("id").
- each(new Fields("id"), new Debug("#### max-id"));
-
- return topology.build();
- }
-
- public static StormTopology buildWordsTopology() {
- FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
- new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
- new Values("how many apples can you eat"), new Values("to be or not to be the person"));
- spout.setCycle(true);
-
- TridentTopology topology = new TridentTopology();
- Stream wordsStream = topology.newStream("spout1", spout).parallelismHint(16).
- each(new Fields("sentence"), new Split(), new Fields("word")).
- each(new Fields("word"), new Debug("##### words"));
-
- wordsStream.minBy("word").
- each(new Fields("word"), new Debug("#### lowest word"));
-
- wordsStream.maxBy("word").
- each(new Fields("word"), new Debug("#### highest word"));
-
- return topology.build();
- }
-
- public static StormTopology buildVehiclesTopology() {
-
- FixedBatchSpout spout = new FixedBatchSpout(new Fields("vehicle", "driver"), 10, Vehicle.generateVehicles(20));
- spout.setCycle(true);
-
- TridentTopology topology = new TridentTopology();
- Stream vehiclesStream = topology.newStream("spout1", spout).
- each(new Fields("vehicle"), new Debug("##### vehicles"));
-
- vehiclesStream.min(new SpeedComparator())
- .each(new Fields("vehicle"), new Debug("#### slowest vehicle"))
- .project(new Fields("driver")). each(new Fields("driver"), new Debug("##### slowest driver"));
-
- vehiclesStream.max(new SpeedComparator())
- .each(new Fields("vehicle"), new Debug("#### fastest vehicle"))
- .project(new Fields("driver")). each(new Fields("driver"), new Debug("##### fastest driver"));
-
- vehiclesStream.max(new EfficiencyComparator()).
- each(new Fields("vehicle"), new Debug("#### efficient vehicle"));
-
- return topology.build();
- }
-
- public static void main(String[] args) throws Exception {
- Config conf = new Config();
- conf.setMaxSpoutPending(20);
- StormTopology[] topologies = {buildWordsTopology(), buildIdsTopology(), buildVehiclesTopology()};
- if (args.length == 0) {
- for (StormTopology topology : topologies) {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("min-max-topology", conf, topology);
- Utils.sleep(60*1000);
- cluster.shutdown();
- }
- System.exit(0);
- } else {
- conf.setNumWorkers(3);
- int ct=1;
- for (StormTopology topology : topologies) {
- StormSubmitter.submitTopologyWithProgressBar(args[0]+"-"+ct++, conf, topology);
- }
- }
- }
-
- static class SpeedComparator implements Comparator<TridentTuple>, Serializable {
-
- @Override
- public int compare(TridentTuple tuple1, TridentTuple tuple2) {
- Vehicle vehicle1 = (Vehicle) tuple1.getValueByField("vehicle");
- Vehicle vehicle2 = (Vehicle) tuple2.getValueByField("vehicle");
- return Integer.compare(vehicle1.maxSpeed, vehicle2.maxSpeed);
- }
- }
-
- static class EfficiencyComparator implements Comparator<TridentTuple>, Serializable {
-
- @Override
- public int compare(TridentTuple tuple1, TridentTuple tuple2) {
- Vehicle vehicle1 = (Vehicle) tuple1.getValueByField("vehicle");
- Vehicle vehicle2 = (Vehicle) tuple2.getValueByField("vehicle");
- return Double.compare(vehicle1.efficiency, vehicle2.efficiency);
- }
-
- }
-
- static class Driver implements Serializable {
- final String name;
- final int id;
-
- Driver(String name, int id) {
- this.name = name;
- this.id = id;
- }
-
- @Override
- public String toString() {
- return "Driver{" +
- "name='" + name + '\'' +
- ", id=" + id +
- '}';
- }
- }
-
- static class Vehicle implements Serializable {
- final String name;
- final int maxSpeed;
- final double efficiency;
-
- public Vehicle(String name, int maxSpeed, double efficiency) {
- this.name = name;
- this.maxSpeed = maxSpeed;
- this.efficiency = efficiency;
- }
-
- @Override
- public String toString() {
- return "Vehicle{" +
- "name='" + name + '\'' +
- ", maxSpeed=" + maxSpeed +
- ", efficiency=" + efficiency +
- '}';
- }
-
- public static List<Object>[] generateVehicles(int count) {
- List<Object>[] vehicles = new List[count];
- for(int i=0; i<count; i++) {
- int id = i-1;
- vehicles[i] =
- (new Values(
- new Vehicle("Vehicle-"+id, ThreadLocalRandom.current().nextInt(0, 100), ThreadLocalRandom.current().nextDouble(1, 5)),
- new Driver("Driver-"+id, id)
- ));
- }
- return vehicles;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/Stream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/Stream.java b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
index fa62b72..d313678 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/Stream.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/Stream.java
@@ -448,10 +448,11 @@ public class Stream implements IAggregatableStream {
/**
* This aggregator operation computes the minimum of tuples by the given {@code inputFieldName} and it is
- * assumed that its value is an instance of {@code Comparable}.
+ * assumed that its value is an instance of {@code Comparable}. If the value of tuple with field {@code inputFieldName} is not an
+ * instance of {@code Comparable} then it throws {@code ClassCastException}
*
* @param inputFieldName input field name
- * @return
+ * @return the new stream with this operation.
*/
public Stream minBy(String inputFieldName) {
Aggregator<ComparisonAggregator.State> min = new Min(inputFieldName);
@@ -460,12 +461,13 @@ public class Stream implements IAggregatableStream {
/**
* This aggregator operation computes the minimum of tuples by the given {@code inputFieldName} in a stream by
- * using the given {@code comparator}.
+ * using the given {@code comparator}. If the value of tuple with field {@code inputFieldName} is not an
+ * instance of {@code T} then it throws {@code ClassCastException}
*
* @param inputFieldName input field name
* @param comparator comparator used in for finding minimum of two tuple values of {@code inputFieldName}.
* @param <T> type of tuple's given input field value.
- * @return
+ * @return the new stream with this operation.
*/
public <T> Stream minBy(String inputFieldName, Comparator<T> comparator) {
Aggregator<ComparisonAggregator.State> min = new MinWithComparator<>(inputFieldName, comparator);
@@ -477,7 +479,7 @@ public class Stream implements IAggregatableStream {
* {@code TridentTuple}s.
*
* @param comparator comparator used in for finding minimum of two tuple values.
- * @return
+ * @return the new stream with this operation.
*/
public Stream min(Comparator<TridentTuple> comparator) {
Aggregator<ComparisonAggregator.State> min = new MinWithComparator<>(comparator);
@@ -486,10 +488,11 @@ public class Stream implements IAggregatableStream {
/**
* This aggregator operation computes the maximum of tuples by the given {@code inputFieldName} and it is
- * assumed that its value is an instance of {@code Comparable}.
+ * assumed that its value is an instance of {@code Comparable}. If the value of tuple with field {@code inputFieldName} is not an
+ * instance of {@code Comparable} then it throws {@code ClassCastException}
*
* @param inputFieldName input field name
- * @return
+ * @return the new stream with this operation.
*/
public Stream maxBy(String inputFieldName) {
Aggregator<ComparisonAggregator.State> max = new Max(inputFieldName);
@@ -498,12 +501,13 @@ public class Stream implements IAggregatableStream {
/**
* This aggregator operation computes the maximum of tuples by the given {@code inputFieldName} in a stream by
- * using the given {@code comparator}.
+ * using the given {@code comparator}. If the value of tuple with field {@code inputFieldName} is not an
+ * instance of {@code T} then it throws {@code ClassCastException}
*
* @param inputFieldName input field name
* @param comparator comparator used in for finding maximum of two tuple values of {@code inputFieldName}.
* @param <T> type of tuple's given input field value.
- * @return
+ * @return the new stream with this operation.
*/
public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator) {
Aggregator<ComparisonAggregator.State> max = new MaxWithComparator<>(inputFieldName, comparator);
@@ -515,7 +519,7 @@ public class Stream implements IAggregatableStream {
* {@code TridentTuple}s.
*
* @param comparator comparator used in for finding maximum of two tuple values.
- * @return
+ * @return the new stream with this operation.
*/
public Stream max(Comparator<TridentTuple> comparator) {
Aggregator<ComparisonAggregator.State> max = new MaxWithComparator<>(comparator);
http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java
index 0109bb5..82b657a 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/ComparisonAggregator.java
@@ -21,12 +21,16 @@ package org.apache.storm.trident.operation.builtin;
import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Abstract {@code Aggregator} for comparing two values in a stream.
*
*/
public abstract class ComparisonAggregator<T> extends BaseAggregator<ComparisonAggregator.State> {
+ private static final Logger log = LoggerFactory.getLogger(ComparisonAggregator.class);
+ private Object batchId;
public static class State {
TridentTuple previousTuple;
@@ -42,6 +46,8 @@ public abstract class ComparisonAggregator<T> extends BaseAggregator<ComparisonA
@Override
public State init(Object batchId, TridentCollector collector) {
+ this.batchId = batchId;
+ log.debug("Started comparison aggregation for batch: [{}] in operation [{}]", batchId, this);
return new State();
}
@@ -50,6 +56,8 @@ public abstract class ComparisonAggregator<T> extends BaseAggregator<ComparisonA
T value1 = valueFromTuple(state.previousTuple);
T value2 = valueFromTuple(tuple);
+ log.debug("Aggregated tuple value in state [{}], and received tuple value [{}] in operation [{}]", value1, value2, this);
+
if(value2 == null) {
return;
}
@@ -62,11 +70,22 @@ public abstract class ComparisonAggregator<T> extends BaseAggregator<ComparisonA
protected T valueFromTuple(TridentTuple tuple) {
// when there is no input field then the whole tuple is considered for comparison.
- return (T) (inputFieldName != null && tuple != null ? tuple.getValueByField(inputFieldName) : tuple);
+ Object value = null;
+ if (inputFieldName != null && tuple != null) {
+ value = tuple.getValueByField(inputFieldName);
+ } else {
+ value = tuple;
+ }
+
+ log.debug("value from tuple is [{}] with input field [{}] and tuple [{}]", value, inputFieldName, tuple);
+
+ return (T) value;
}
@Override
public void complete(State state, TridentCollector collector) {
- collector.emit(state.previousTuple.getValues());
+ log.debug("Completed comparison aggregation for batch [{}] with resultant tuple: [{}] in operation [{}]", batchId, state.previousTuple, this);
+
+ collector.emit(state.previousTuple != null ? state.previousTuple.getValues() : null);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java
index 5385dfb..f1221b0 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Max.java
@@ -34,10 +34,4 @@ public class Max extends ComparisonAggregator<Comparable<Object>> {
return value1.compareTo(value2) > 0 ? value1 : value2;
}
- /**
- * Returns an aggregator computes the maximum of aggregated tuples in a stream. It assumes that the tuple has one value and
- * it is an instance of {@code Comparable}.
- *
- * @return
- */
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java
index 172aa58..0e8ae90 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MaxWithComparator.java
@@ -41,4 +41,11 @@ public class MaxWithComparator<T> extends ComparisonAggregator<T> {
protected T compare(T value1, T value2) {
return comparator.compare(value1, value2) > 0 ? value1 : value2;
}
+
+ @Override
+ public String toString() {
+ return "MaxWithComparator{" +
+ "comparator=" + comparator +
+ '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java
index 0757d7c..010a919 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/Min.java
@@ -33,12 +33,4 @@ public class Min extends ComparisonAggregator<Comparable<Object>> {
protected Comparable<Object> compare(Comparable<Object> value1, Comparable<Object> value2) {
return value1.compareTo(value2) < 0 ? value1 : value2;
}
-
- /**
- * Returns an aggregator computes the maximum of aggregated tuples in a stream. It assumes that the tuple has one value and
- * it is an instance of {@code Comparable}.
- *
- * @return
- * @param inputFieldName
- */
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java
index d33e000..64144cb 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/operation/builtin/MinWithComparator.java
@@ -41,4 +41,11 @@ public class MinWithComparator<T> extends ComparisonAggregator<T> {
protected T compare(T value1, T value2) {
return comparator.compare(value1, value2) < 0 ? value1 : value2;
}
+
+ @Override
+ public String toString() {
+ return "MinWithComparator{" +
+ "comparator=" + comparator +
+ '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc263cba/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java b/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java
deleted file mode 100644
index a4a9a79..0000000
--- a/storm-core/src/jvm/org/apache/storm/trident/testing/NumberGeneratorSpout.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.storm.trident.testing;
-
-import org.apache.storm.Config;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IBatchSpout;
-import org.apache.storm.tuple.Fields;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-
-/**
- *
- */
-public class NumberGeneratorSpout implements IBatchSpout {
- private final Fields fields;
- private final int batchSize;
- private final int maxNumber;
- private final Map<Long, List<List<Object>>> batches = new HashMap<>();
-
- public NumberGeneratorSpout(Fields fields, int batchSize, int maxNumber) {
- this.fields = fields;
- this.batchSize = batchSize;
- this.maxNumber = maxNumber;
- }
-
- @Override
- public void open(Map conf, TopologyContext context) {
- }
-
- @Override
- public void emitBatch(long batchId, TridentCollector collector) {
- List<List<Object>> values = null;
- if(batches.containsKey(batchId)) {
- values = batches.get(batchId);
- } else {
- values = new ArrayList<>();
- for (int i = 0; i < batchSize; i++) {
- values.add(Collections.singletonList((Object) ThreadLocalRandom.current().nextInt(0, maxNumber + 1)));
- }
- batches.put(batchId, values);
- }
- for (List<Object> value : values) {
- collector.emit(value);
- }
- }
-
- @Override
- public void ack(long batchId) {
- batches.remove(batchId);
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- Config conf = new Config();
- conf.setMaxTaskParallelism(1);
- return conf;
- }
-
- @Override
- public Fields getOutputFields() {
- return fields;
- }
-}