You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/12 08:29:13 UTC
[05/10] storm git commit: STORM-2447: add in storm local to avoid
having server on worker classpath
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-jms-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-jms-examples/pom.xml b/examples/storm-jms-examples/pom.xml
index 1e10fde..591cf7d 100644
--- a/examples/storm-jms-examples/pom.xml
+++ b/examples/storm-jms-examples/pom.xml
@@ -60,7 +60,7 @@
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
- <artifactId>storm-server</artifactId>
+ <artifactId>storm-client</artifactId>
<version>${project.version}</version>
<!-- keep storm out of the jar-with-dependencies -->
<scope>${provided.scope}</scope>
@@ -112,39 +112,6 @@
</executions>
</plugin>
-
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>exec-maven-plugin</artifactId>
- <version>1.2.1</version>
- <executions>
- <execution>
- <goals>
- <goal>exec</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <executable>java</executable>
- <includeProjectDependencies>true</includeProjectDependencies>
- <includePluginDependencies>true</includePluginDependencies>
- <mainClass>org.apache.storm.jms.example.ExampleJmsTopology</mainClass>
- <systemProperties>
- <systemProperty>
- <key>log4j.configuration</key>
- <value>file:./src/main/resources/log4j.properties</value>
- </systemProperty>
- </systemProperties>
- </configuration>
- <dependencies>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-server</artifactId>
- <version>${project.version}</version>
- <type>jar</type>
- </dependency>
- </dependencies>
- </plugin>
</plugins>
</build>
-</project>
\ No newline at end of file
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
index 82dbd5b..accb052 100644
--- a/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
+++ b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
@@ -23,8 +23,6 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.jms.JmsMessageProducer;
import org.apache.storm.jms.JmsProvider;
@@ -34,7 +32,6 @@ import org.apache.storm.jms.spout.JmsSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.ITuple;
-import org.apache.storm.utils.Utils;
public class ExampleJmsTopology {
public static final String JMS_QUEUE_SPOUT = "JMS_QUEUE_SPOUT";
@@ -111,21 +108,13 @@ public class ExampleJmsTopology {
JMS_TOPIC_SPOUT);
Config conf = new Config();
-
+ String topoName = "storm-jms-example";
if (args.length > 0) {
- conf.setNumWorkers(3);
-
- StormSubmitter.submitTopology(args[0], conf,
- builder.createTopology());
- } else {
-
- conf.setDebug(true);
-
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("storm-jms-example", conf, builder.createTopology());) {
- Utils.sleep(60000);
- }
+ topoName = args[0];
}
- }
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology(topoName, conf,
+ builder.createTopology());
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-kafka-client-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-client-examples/pom.xml b/examples/storm-kafka-client-examples/pom.xml
index e319346..d0877c8 100644
--- a/examples/storm-kafka-client-examples/pom.xml
+++ b/examples/storm-kafka-client-examples/pom.xml
@@ -34,7 +34,7 @@
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
- <artifactId>storm-server</artifactId>
+ <artifactId>storm-client</artifactId>
<version>${project.version}</version>
<scope>${provided.scope}</scope>
</dependency>
@@ -134,4 +134,4 @@
</plugins>
</build>
-</project>
\ No newline at end of file
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
index edd1f09..9ab00f5 100644
--- a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
+++ b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
@@ -18,6 +18,13 @@
package org.apache.storm.kafka.trident;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
@@ -33,13 +40,6 @@ import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-
public class TridentKafkaClientWordCountNamedTopics {
private static final String TOPIC_1 = "test-trident";
private static final String TOPIC_2 = "test-trident-1";
@@ -93,49 +93,19 @@ public class TridentKafkaClientWordCountNamedTopics {
System.out.printf("Running with broker_url: [%s], topics: [%s, %s]\n", brokerUrl, topic1, topic2);
- Config tpConf = LocalSubmitter.defaultConfig(true);
-
- if (args.length == 4) { //Submit Remote
- // Producers
- StormSubmitter.submitTopology(topic1 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, topic1));
- StormSubmitter.submitTopology(topic2 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, topic2));
- // Consumer
- StormSubmitter.submitTopology("topics-consumer", tpConf, TridentKafkaConsumerTopology.newTopology(newKafkaTridentSpoutOpaque()));
-
- // Print results to console, which also causes the print filter in the consumer topology to print the results in the worker log
- Thread.sleep(2000);
- DrpcResultsPrinter.remoteClient().printResults(60, 1, TimeUnit.SECONDS);
-
- } else { //Submit Local
-
- final LocalSubmitter localSubmitter = LocalSubmitter.newInstance();
- final String topic1Tp = "topic1-producer";
- final String topic2Tp = "topic2-producer";
- final String consTpName = "topics-consumer";
-
- try {
- // Producers
- localSubmitter.submit(topic1Tp, tpConf, KafkaProducerTopology.newTopology(brokerUrl, topic1));
- localSubmitter.submit(topic2Tp, tpConf, KafkaProducerTopology.newTopology(brokerUrl, topic2));
- // Consumer
- try {
- localSubmitter.submit(consTpName, tpConf, TridentKafkaConsumerTopology.newTopology(
- localSubmitter.getDrpc(), newKafkaTridentSpoutOpaque()));
- // print
- localSubmitter.printResults(15, 1, TimeUnit.SECONDS);
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- } finally {
- // kill
- localSubmitter.kill(topic1Tp);
- localSubmitter.kill(topic2Tp);
- localSubmitter.kill(consTpName);
- // shutdown
- localSubmitter.shutdown();
- }
- }
+ Config tpConf = new Config();
+ tpConf.setDebug(true);
+ tpConf.setMaxSpoutPending(5);
+
+ // Producers
+ StormSubmitter.submitTopology(topic1 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, topic1));
+ StormSubmitter.submitTopology(topic2 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, topic2));
+ // Consumer
+ StormSubmitter.submitTopology("topics-consumer", tpConf, TridentKafkaConsumerTopology.newTopology(newKafkaTridentSpoutOpaque()));
+
+ // Print results to console, which also causes the print filter in the consumer topology to print the results in the worker log
+ Thread.sleep(2000);
+ DrpcResultsPrinter.remoteClient().printResults(60, 1, TimeUnit.SECONDS);
}
System.exit(0); // Kill all the non daemon threads
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-kafka-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/pom.xml b/examples/storm-kafka-examples/pom.xml
index 8e89aec..f2d927d 100644
--- a/examples/storm-kafka-examples/pom.xml
+++ b/examples/storm-kafka-examples/pom.xml
@@ -35,7 +35,7 @@
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
- <artifactId>storm-server</artifactId>
+ <artifactId>storm-client</artifactId>
<version>${project.version}</version>
<scope>${provided.scope}</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java
index b08fc96..3476d9f 100644
--- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java
+++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java
@@ -18,17 +18,16 @@
package org.apache.storm.kafka.trident;
-import org.apache.storm.LocalDRPC;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
import org.apache.storm.generated.DistributedRPC;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.storm.utils.Utils;
import org.apache.storm.utils.DRPCClient;
+import org.apache.storm.utils.Utils;
+import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
public class DrpcResultsPrinter {
private static final Logger LOG = LoggerFactory.getLogger(DrpcResultsPrinter.class);
@@ -39,13 +38,6 @@ public class DrpcResultsPrinter {
}
/**
- * @return local DRPC client running on the same JVML
- */
- public static DrpcResultsPrinter localClient() {
- return new DrpcResultsPrinter(new LocalDRPC());
- }
-
- /**
* @return remote DRPC client running on local host, on port 3772, with defaults.yaml config
*/
public static DrpcResultsPrinter remoteClient() {
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/LocalSubmitter.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/LocalSubmitter.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/LocalSubmitter.java
deleted file mode 100644
index 9666695..0000000
--- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/LocalSubmitter.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalDRPC;
-import org.apache.storm.generated.StormTopology;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-public class LocalSubmitter {
- protected static final Logger LOG = LoggerFactory.getLogger(LocalSubmitter.class);
-
- private LocalDRPC drpc;
- private LocalCluster cluster;
-
- public LocalSubmitter(LocalDRPC drpc, LocalCluster cluster) {
- this.drpc = drpc;
- this.cluster = cluster;
- }
-
- public static LocalSubmitter newInstance() {
- try {
- return new LocalSubmitter(new LocalDRPC(), new LocalCluster());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public static Config defaultConfig() {
- return defaultConfig(false);
- }
-
- public static Config defaultConfig(boolean debug) {
- final Config conf = new Config();
- conf.setMaxSpoutPending(20);
- conf.setDebug(debug);
- return conf;
- }
-
- public LocalSubmitter(StormTopology topology, LocalDRPC drpc, LocalCluster cluster, String name) {
- this(drpc, cluster);
- }
-
- public void submit(String name, Config config, StormTopology topology) {
- try {
- cluster.submitTopology(name, config, topology);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Prints the DRPC results for the amount of time specified
- */
- public void printResults(int num, int time, TimeUnit unit) {
- for (int i = 0; i < num; i++) {
- try {
- LOG.info("--- DRPC RESULT: " + drpc.execute("words", "the and apple snow jumped"));
- System.out.println();
- Thread.sleep(unit.toMillis(time));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- public void kill (String name) {
- try {
- cluster.killTopology(name);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public void shutdown() {
- cluster.shutdown();
- }
-
- public LocalDRPC getDrpc() {
- return drpc;
- }
-
- public LocalCluster getCluster() {
- return cluster;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
index a39eba1..e571210 100644
--- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
+++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
@@ -18,7 +18,6 @@
package org.apache.storm.kafka.trident;
-import org.apache.storm.LocalDRPC;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.starter.trident.DebugMemoryMapState;
import org.apache.storm.trident.Stream;
@@ -30,7 +29,6 @@ import org.apache.storm.trident.operation.builtin.Debug;
import org.apache.storm.trident.operation.builtin.FilterNull;
import org.apache.storm.trident.operation.builtin.MapGet;
import org.apache.storm.trident.spout.ITridentDataSource;
-import org.apache.storm.trident.testing.MemoryMapState;
import org.apache.storm.trident.testing.Split;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
@@ -44,22 +42,13 @@ public class TridentKafkaConsumerTopology {
* See {@link TridentKafkaConsumerTopology#newTopology(LocalDRPC, ITridentDataSource)}
*/
public static StormTopology newTopology(ITridentDataSource tridentSpout) {
- return newTopology(null, tridentSpout);
- }
-
- /**
- * @param drpc The DRPC stream to be used in querying the word counts. Can be null in distributed mode
- * @return a trident topology that consumes sentences from the kafka topic specified using a
- * {@link TransactionalTridentKafkaSpout} computes the word count and stores it in a {@link MemoryMapState}.
- */
- public static StormTopology newTopology(LocalDRPC drpc, ITridentDataSource tridentSpout) {
final TridentTopology tridentTopology = new TridentTopology();
- addDRPCStream(tridentTopology, addTridentState(tridentTopology, tridentSpout), drpc);
+ addDRPCStream(tridentTopology, addTridentState(tridentTopology, tridentSpout));
return tridentTopology.build();
}
- private static Stream addDRPCStream(TridentTopology tridentTopology, final TridentState state, LocalDRPC drpc) {
- return tridentTopology.newDRPCStream("words", drpc)
+ private static Stream addDRPCStream(TridentTopology tridentTopology, final TridentState state) {
+ return tridentTopology.newDRPCStream("words")
.each(new Fields("args"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.stateQuery(state, new Fields("word"), new MapGet(), new Fields("count"))
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaTopology.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaTopology.java
index 11ea899..ad785b8 100644
--- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaTopology.java
+++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaTopology.java
@@ -18,9 +18,10 @@
package org.apache.storm.kafka.trident;
+import java.util.Properties;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
+import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
@@ -30,8 +31,6 @@ import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
-import java.util.Properties;
-
public class TridentKafkaTopology {
private static StormTopology buildTopology(String brokerConnectionString) {
@@ -79,10 +78,6 @@ public class TridentKafkaTopology {
if(args.length < 1) {
System.out.println("Please provide kafka broker url ,e.g. localhost:9092");
}
-
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("wordCounter", new Config(), buildTopology(args[0]));) {
- Thread.sleep(60 * 1000);
- }
+ StormSubmitter.submitTopology("wordCounter", new Config(), buildTopology(args[0]));
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java
index 84dc380..6aa54ea 100644
--- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java
+++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java
@@ -77,40 +77,24 @@ public class TridentKafkaWordCount implements Serializable {
public static void main(String[] args) throws Exception {
final String[] zkBrokerUrl = parseUrl(args);
final String topicName = "test";
- Config tpConf = LocalSubmitter.defaultConfig();
+ Config tpConf = new Config();
+ tpConf.setMaxSpoutPending(20);
+ String prodTpName = "kafkaBolt";
+ String consTpName = "wordCounter";
- if (args.length == 3) { //Submit Remote
- // Producer
- StormSubmitter.submitTopology(args[2] + "-producer", tpConf, KafkaProducerTopology.newTopology(zkBrokerUrl[1], topicName));
- // Consumer
- StormSubmitter.submitTopology(args[2] + "-consumer", tpConf, TridentKafkaConsumerTopology.newTopology(
- new TransactionalTridentKafkaSpout(newTridentKafkaConfig(zkBrokerUrl[0]))));
-
- // Print results to console, which also causes the print filter in the consumer topology to print the results in the worker log
- Thread.sleep(2000);
- DrpcResultsPrinter.remoteClient().printResults(60, 1, TimeUnit.SECONDS);
- } else { //Submit Local
- final LocalSubmitter localSubmitter = LocalSubmitter.newInstance();
- final String prodTpName = "kafkaBolt";
- final String consTpName = "wordCounter";
-
- try {
- // Producer
- localSubmitter.submit(prodTpName, tpConf, KafkaProducerTopology.newTopology(zkBrokerUrl[1], topicName));
- // Consumer
- localSubmitter.submit(consTpName, tpConf, TridentKafkaConsumerTopology.newTopology(localSubmitter.getDrpc(),
- new TransactionalTridentKafkaSpout(newTridentKafkaConfig(zkBrokerUrl[0]))));
-
- // print
- new DrpcResultsPrinter(localSubmitter.getDrpc()).printResults(60, 1, TimeUnit.SECONDS);
- } finally {
- // kill
- localSubmitter.kill(prodTpName);
- localSubmitter.kill(consTpName);
- // shutdown
- localSubmitter.shutdown();
- }
+ if (args.length == 3) {
+ prodTpName = args[2] + "-producer";
+ consTpName = args[2] + "-consumer";
}
+ // Producer
+ StormSubmitter.submitTopology(prodTpName, tpConf, KafkaProducerTopology.newTopology(zkBrokerUrl[1], topicName));
+ // Consumer
+ StormSubmitter.submitTopology(consTpName, tpConf, TridentKafkaConsumerTopology.newTopology(
+ new TransactionalTridentKafkaSpout(newTridentKafkaConfig(zkBrokerUrl[0]))));
+
+ // Print results to console, which also causes the print filter in the consumer topology to print the results in the worker log
+ Thread.sleep(2000);
+ DrpcResultsPrinter.remoteClient().printResults(60, 1, TimeUnit.SECONDS);
}
private static String[] parseUrl(String[] args) {
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-mongodb-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/pom.xml b/examples/storm-mongodb-examples/pom.xml
index e4a2a92..16200fc 100644
--- a/examples/storm-mongodb-examples/pom.xml
+++ b/examples/storm-mongodb-examples/pom.xml
@@ -30,7 +30,7 @@
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
- <artifactId>storm-server</artifactId>
+ <artifactId>storm-client</artifactId>
<version>${project.version}</version>
<scope>${provided.scope}</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/InsertWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/InsertWordCount.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/InsertWordCount.java
index 6f71b1c..3b27fd1 100644
--- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/InsertWordCount.java
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/InsertWordCount.java
@@ -18,17 +18,12 @@
package org.apache.storm.mongodb.topology;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
import org.apache.storm.mongodb.bolt.MongoInsertBolt;
import org.apache.storm.mongodb.common.mapper.MongoMapper;
import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
-
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
public class InsertWordCount {
private static final String WORD_SPOUT = "WORD_SPOUT";
@@ -65,17 +60,13 @@ public class InsertWordCount {
builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
builder.setBolt(INSERT_BOLT, insertBolt, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
-
- if (args.length == 2) {
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("test", config, builder.createTopology());) {
- Thread.sleep(30000);
- }
- System.exit(0);
- } else if (args.length == 3) {
- StormSubmitter.submitTopology(args[2], config, builder.createTopology());
- } else{
+ String topoName = "test";
+ if (args.length == 3) {
+ topoName = args[2];
+ } else if (args.length > 3) {
System.out.println("Usage: InsertWordCount <mongodb url> <mongodb collection> [topology name]");
+ return;
}
+ StormSubmitter.submitTopology(topoName, config, builder.createTopology());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java
index 5140685..c9d43bd 100644
--- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/LookupWordCount.java
@@ -18,7 +18,6 @@
package org.apache.storm.mongodb.topology;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.mongodb.bolt.MongoLookupBolt;
import org.apache.storm.mongodb.common.QueryFilterCreator;
@@ -64,17 +63,14 @@ public class LookupWordCount {
builder.setBolt(LOOKUP_BOLT, lookupBolt, 1).shuffleGrouping(WORD_SPOUT);
builder.setBolt(TOTAL_COUNT_BOLT, totalBolt, 1).fieldsGrouping(LOOKUP_BOLT, new Fields("word"));
- if (args.length == 2) {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", config, builder.createTopology());
- Thread.sleep(30000);
- cluster.killTopology("test");
- cluster.shutdown();
- System.exit(0);
- } else if (args.length == 3) {
- StormSubmitter.submitTopology(args[2], config, builder.createTopology());
- } else{
+ String topoName = "test";
+ if (args.length == 3) {
+ topoName = args[2];
+ } else if (args.length > 3) {
System.out.println("Usage: LookupWordCount <mongodb url> <mongodb collection> [topology name]");
+ return;
}
+
+ StormSubmitter.submitTopology(topoName, config, builder.createTopology());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
index b4af4ca..8e4f757 100644
--- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/topology/UpdateWordCount.java
@@ -18,16 +18,14 @@
package org.apache.storm.mongodb.topology;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
-import org.apache.storm.mongodb.common.mapper.MongoUpdateMapper;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
import org.apache.storm.mongodb.bolt.MongoUpdateBolt;
import org.apache.storm.mongodb.common.QueryFilterCreator;
import org.apache.storm.mongodb.common.SimpleQueryFilterCreator;
+import org.apache.storm.mongodb.common.mapper.MongoUpdateMapper;
import org.apache.storm.mongodb.common.mapper.SimpleMongoUpdateMapper;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
public class UpdateWordCount {
private static final String WORD_SPOUT = "WORD_SPOUT";
@@ -73,17 +71,13 @@ public class UpdateWordCount {
builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
builder.setBolt(UPDATE_BOLT, updateBolt, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
-
- if (args.length == 2) {
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("test", config, builder.createTopology());) {
- Thread.sleep(30000);
- }
- System.exit(0);
- } else if (args.length == 3) {
- StormSubmitter.submitTopology(args[2], config, builder.createTopology());
- } else{
+ String topoName = "test";
+ if (args.length == 3) {
+ topoName = args[2];
+ } else if (args.length > 3) {
System.out.println("Usage: UpdateWordCount <mongodb url> <mongodb collection> [topology name]");
+ return;
}
+ StormSubmitter.submitTopology(topoName, config, builder.createTopology());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
index 14dccbd..ef5a39a 100644
--- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTrident.java
@@ -18,8 +18,6 @@
package org.apache.storm.mongodb.trident;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.mongodb.common.mapper.MongoMapper;
@@ -74,19 +72,15 @@ public class WordCountTrident {
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setMaxSpoutPending(5);
- if (args.length == 2) {
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("wordCounter", conf, buildTopology(args[0], args[1]));) {
- Thread.sleep(60 * 1000);
- }
- System.exit(0);
- }
- else if(args.length == 3) {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopology(args[2], conf, buildTopology(args[0], args[1]));
- } else{
+ String topoName = "wordCounter";
+ if (args.length == 3) {
+ topoName = args[2];
+ } else if (args.length > 3 || args.length < 2) {
System.out.println("Usage: WordCountTrident <mongodb url> <mongodb collection> [topology name]");
+ return;
}
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology(topoName, conf, buildTopology(args[0], args[1]));
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java
----------------------------------------------------------------------
diff --git a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java
index 83c0caf..0ff2c51 100644
--- a/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java
+++ b/examples/storm-mongodb-examples/src/main/java/org/apache/storm/mongodb/trident/WordCountTridentMap.java
@@ -18,14 +18,13 @@
package org.apache.storm.mongodb.trident;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.mongodb.common.QueryFilterCreator;
import org.apache.storm.mongodb.common.SimpleQueryFilterCreator;
import org.apache.storm.mongodb.common.mapper.MongoMapper;
import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
-import org.apache.storm.mongodb.trident.state.*;
+import org.apache.storm.mongodb.trident.state.MongoMapState;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
@@ -76,20 +75,15 @@ public class WordCountTridentMap {
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setMaxSpoutPending(5);
- if (args.length == 2) {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("wordCounter", conf, buildTopology(args[0], args[1]));
- Thread.sleep(60 * 1000);
- cluster.killTopology("wordCounter");
- cluster.shutdown();
- System.exit(0);
- }
- else if(args.length == 3) {
- conf.setNumWorkers(3);
- StormSubmitter.submitTopology(args[2], conf, buildTopology(args[0], args[1]));
- } else{
+ String topoName = "wordCounter";
+ if (args.length == 3) {
+ topoName = args[2];
+ } else if (args.length > 3 || args.length < 2) {
System.out.println("Usage: WordCountTrident <mongodb url> <mongodb collection> [topology name]");
+ return;
}
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopology(topoName, conf, buildTopology(args[0], args[1]));
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-mqtt-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-mqtt-examples/pom.xml b/examples/storm-mqtt-examples/pom.xml
index 41ff820..a178c5b 100644
--- a/examples/storm-mqtt-examples/pom.xml
+++ b/examples/storm-mqtt-examples/pom.xml
@@ -34,7 +34,7 @@
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
- <artifactId>storm-server</artifactId>
+ <artifactId>storm-client</artifactId>
<version>${project.version}</version>
<scope>${provided.scope}</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-opentsdb-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-opentsdb-examples/pom.xml b/examples/storm-opentsdb-examples/pom.xml
index 9f550e6..4ec799b 100644
--- a/examples/storm-opentsdb-examples/pom.xml
+++ b/examples/storm-opentsdb-examples/pom.xml
@@ -30,7 +30,7 @@
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
- <artifactId>storm-server</artifactId>
+ <artifactId>storm-client</artifactId>
<version>${project.version}</version>
<scope>${provided.scope}</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java
index 7dc65fd..009366f 100644
--- a/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java
+++ b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java
@@ -18,17 +18,15 @@
*/
package org.apache.storm.opentsdb;
+import java.util.Collections;
+
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
import org.apache.storm.opentsdb.bolt.OpenTsdbBolt;
import org.apache.storm.opentsdb.bolt.TupleOpenTsdbDatapointMapper;
import org.apache.storm.opentsdb.client.OpenTsdbClient;
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
-import java.util.Collections;
-
/**
* Sample application to use OpenTSDB bolt.
*/
@@ -51,19 +49,12 @@ public class SampleOpenTsdbBoltTopology {
Config conf = new Config();
conf.setDebug(true);
-
+ String topoName = "word-count";
if (args.length > 1) {
- conf.setNumWorkers(3);
-
- StormSubmitter.submitTopologyWithProgressBar(args[1], conf, topologyBuilder.createTopology());
- } else {
- conf.setMaxTaskParallelism(3);
-
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("word-count", conf, topologyBuilder.createTopology());) {
- Thread.sleep(30000);
- }
- System.exit(0);
+ topoName = args[1];
}
+ conf.setNumWorkers(3);
+
+ StormSubmitter.submitTopologyWithProgressBar(topoName, conf, topologyBuilder.createTopology());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java
index 3220068..8ac950b 100644
--- a/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java
+++ b/examples/storm-opentsdb-examples/src/main/java/org/apache/storm/opentsdb/SampleOpenTsdbTridentTopology.java
@@ -18,14 +18,14 @@
*/
package org.apache.storm.opentsdb;
+import java.util.Collections;
+
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
import org.apache.storm.opentsdb.bolt.TupleOpenTsdbDatapointMapper;
import org.apache.storm.opentsdb.client.OpenTsdbClient;
import org.apache.storm.opentsdb.trident.OpenTsdbStateFactory;
import org.apache.storm.opentsdb.trident.OpenTsdbStateUpdater;
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.StormSubmitter;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.Consumer;
@@ -33,8 +33,6 @@ import org.apache.storm.trident.tuple.TridentTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collections;
-
/**
* Sample trident topology to store time series metrics in to OpenTsdb.
*/
@@ -67,20 +65,12 @@ public class SampleOpenTsdbTridentTopology {
Config conf = new Config();
conf.setDebug(true);
-
+ String topoName = "word-count";
if (args.length > 1) {
- conf.setNumWorkers(3);
-
- StormSubmitter.submitTopologyWithProgressBar(args[1], conf, tridentTopology.build());
- } else {
- conf.setMaxTaskParallelism(3);
-
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("word-count", conf, tridentTopology.build())) {
- Thread.sleep(30000);
- }
- System.exit(0);
+ topoName = args[1];
}
+ conf.setNumWorkers(3);
+ StormSubmitter.submitTopologyWithProgressBar(topoName, conf, tridentTopology.build());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-perf/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-perf/pom.xml b/examples/storm-perf/pom.xml
index 260eb06..1625090 100644
--- a/examples/storm-perf/pom.xml
+++ b/examples/storm-perf/pom.xml
@@ -83,7 +83,7 @@
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
- <artifactId>storm-server</artifactId>
+ <artifactId>storm-client</artifactId>
<version>${project.version}</version>
<!--
Use "provided" scope to keep storm out of the jar-with-dependencies
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
index 11c63d3..69df3fb 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
@@ -19,7 +19,6 @@
package org.apache.storm.perf;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.perf.bolt.DevNullBolt;
import org.apache.storm.perf.bolt.IdBolt;
@@ -49,7 +48,7 @@ public class ConstSpoutIdBoltNullBoltTopo {
public static final String BOLT2_COUNT = "bolt2.count";
public static final String SPOUT_COUNT = "spout.count";
- public static StormTopology getTopology(Map conf) {
+ public static StormTopology getTopology(Map<String, Object> conf) {
// 1 - Setup Spout --------
ConstSpout spout = new ConstSpout("some data").withOutputFields("str");
@@ -75,27 +74,19 @@ public class ConstSpoutIdBoltNullBoltTopo {
public static void main(String[] args) throws Exception {
-
- if (args.length <= 0) {
- // submit to local cluster
- Config conf = new Config();
- LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(conf));
-
- Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
- while (true) {// run indefinitely till Ctrl-C
- Thread.sleep(20_000_000);
- }
- } else {
- // submit to real cluster
- if (args.length >2) {
- System.err.println("args: runDurationSec [optionalConfFile]");
- return;
- }
- Integer durationSec = Integer.parseInt(args[0]);
- Map topoConf = (args.length==2) ? Utils.findAndReadConfigFile(args[1]) : new Config();
-
- // Submit topology to storm cluster
- Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+ int runTime = -1;
+ Config topoConf = new Config();
+ if (args.length > 0) {
+ runTime = Integer.parseInt(args[0]);
+ }
+ if (args.length > 1) {
+ topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
+ }
+ if (args.length > 2) {
+ System.err.println("args: [runDurationSec] [optionalConfFile]");
+ return;
}
+ // Submit topology to storm cluster
+ Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
index 92c2787..298c73e 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutNullBoltTopo.java
@@ -18,8 +18,9 @@
package org.apache.storm.perf;
+import java.util.Map;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.perf.bolt.DevNullBolt;
import org.apache.storm.perf.spout.ConstSpout;
@@ -28,8 +29,6 @@ import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
-import java.util.Map;
-
/***
* This topo helps measure the messaging speed between a spout and a bolt.
* Spout generates a stream of a fixed string.
@@ -51,7 +50,7 @@ public class ConstSpoutNullBoltTopo {
public static final String SHUFFLE_GROUPING = "shuffle";
public static final String DEFAULT_GROUPING = LOCAL_GROPING;
- public static StormTopology getTopology(Map conf) {
+ public static StormTopology getTopology(Map<String, Object> conf) {
// 1 - Setup Spout --------
ConstSpout spout = new ConstSpout("some data").withOutputFields("str");
@@ -78,29 +77,20 @@ public class ConstSpoutNullBoltTopo {
* ConstSpout -> DevNullBolt with configurable grouping (default localOrShuffle)
*/
public static void main(String[] args) throws Exception {
-
- if(args.length <= 0) {
- // For IDE based profiling ... submit topology to local cluster
- Config conf = new Config();
- final LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(conf));
-
- Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
- while (true) {// run indefinitely till Ctrl-C
- Thread.sleep(20_000_000);
- }
-
- } else {
- // For measuring perf against a Storm cluster
- if (args.length > 2) {
- System.err.println("args: runDurationSec [optionalConfFile]");
- return;
- }
- Integer durationSec = Integer.parseInt(args[0]);
- Map topoConf = (args.length==2) ? Utils.findAndReadConfigFile(args[1]) : new Config();
-
- // Submit topology to storm cluster
- Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+ int runTime = -1;
+ Config topoConf = new Config();
+ if (args.length > 0) {
+ runTime = Integer.parseInt(args[0]);
+ }
+ if (args.length > 1) {
+ topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
+ }
+ if (args.length > 2) {
+ System.err.println("args: [runDurationSec] [optionalConfFile]");
+ return;
}
+ // Submit topology to storm cluster
+ Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
index 721ae3d..94bd17f 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutOnlyTopo.java
@@ -19,11 +19,11 @@
package org.apache.storm.perf;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.perf.spout.ConstSpout;
import org.apache.storm.perf.utils.Helper;
import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.Utils;
/***
@@ -52,23 +52,19 @@ public class ConstSpoutOnlyTopo {
* ConstSpout only topology (No bolts)
*/
public static void main(String[] args) throws Exception {
- if(args.length <= 0) {
- // For IDE based profiling ... submit topology to local cluster
- LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology());
-
- Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
- while (true) {// run indefinitely till Ctrl-C
- Thread.sleep(20_000_000);
- }
- } else {
- // Submit topology to storm cluster
- if (args.length != 1) {
- System.err.println("args: runDurationSec");
- return;
- }
- Integer durationSec = Integer.parseInt(args[0]);
-
- Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, new Config(), getTopology());
+ int runTime = -1;
+ Config topoConf = new Config();
+ if (args.length > 0) {
+ runTime = Integer.parseInt(args[0]);
+ }
+ if (args.length > 1) {
+ topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
+ }
+ if (args.length > 2) {
+ System.err.println("args: [runDurationSec] [optionalConfFile]");
+ return;
}
+ // Submit topology to storm cluster
+ Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
index d518c86..e64dd36 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/FileReadWordCountTopo.java
@@ -18,8 +18,9 @@
package org.apache.storm.perf;
+import java.util.Map;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.perf.bolt.CountBolt;
import org.apache.storm.perf.bolt.SplitSentenceBolt;
@@ -29,9 +30,6 @@ import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
-
-import java.util.Map;
-
/***
* This topo helps measure speed of word count.
* Spout loads a file into memory on initialization, then emits the lines in an endless loop.
@@ -54,7 +52,7 @@ public class FileReadWordCountTopo {
public static final int DEFAULT_COUNT_BOLT_NUM = 2;
- public static StormTopology getTopology(Map config) {
+ public static StormTopology getTopology(Map<String, Object> config) {
final int spoutNum = Helper.getInt(config, SPOUT_NUM, DEFAULT_SPOUT_NUM);
final int spBoltNum = Helper.getInt(config, SPLIT_NUM, DEFAULT_SPLIT_BOLT_NUM);
@@ -70,27 +68,19 @@ public class FileReadWordCountTopo {
}
public static void main(String[] args) throws Exception {
- if(args.length <= 0) {
- // For IDE based profiling ... submit topology to local cluster
- Config conf = new Config();
- conf.put(INPUT_FILE, "resources/randomwords.txt");
- LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(conf));
-
- Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
- while (true) {// run indefinitely till Ctrl-C
- Thread.sleep(20_000_000);
- }
- } else {
- // Submit to Storm cluster
- if (args.length !=2) {
- System.err.println("args: runDurationSec confFile");
- return;
- }
- Integer durationSec = Integer.parseInt(args[0]);
- Map topoConf = Utils.findAndReadConfigFile(args[1]);
-
- Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
-
+ int runTime = -1;
+ Config topoConf = new Config();
+ if (args.length > 0) {
+ runTime = Integer.parseInt(args[0]);
+ }
+ if (args.length > 1) {
+ topoConf.putAll(Utils.findAndReadConfigFile(args[1]));
+ }
+ if (args.length > 2) {
+ System.err.println("args: [runDurationSec] [optionalConfFile]");
+ return;
}
+ // Submit topology to storm cluster
+ Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
index 5b97540..ed81a4a 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/StrGenSpoutHdfsBoltTopo.java
@@ -19,7 +19,8 @@
package org.apache.storm.perf;
-import org.apache.storm.LocalCluster;
+import java.util.Map;
+
import org.apache.storm.generated.StormTopology;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
@@ -35,8 +36,6 @@ import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.Utils;
-import java.util.Map;
-
/***
* This topo helps measure speed of writing to Hdfs
* Spout generates fixed length random strings.
@@ -64,7 +63,7 @@ public class StrGenSpoutHdfsBoltTopo {
public static final String BOLT_ID = "hdfsBolt";
- public static StormTopology getTopology(Map topoConf) {
+ public static StormTopology getTopology(Map<String, Object> topoConf) {
final int hdfsBatch = Helper.getInt(topoConf, HDFS_BATCH, DEFAULT_HDFS_BATCH);
// 1 - Setup StringGen Spout --------
@@ -104,30 +103,30 @@ public class StrGenSpoutHdfsBoltTopo {
/** Spout generates random strings and HDFS bolt writes them to a text file */
public static void main(String[] args) throws Exception {
- if(args.length <= 0) {
- // submit to local cluster
- Map topoConf = Utils.findAndReadConfigFile("conf/HdfsSpoutTopo.yaml");
- LocalCluster cluster = Helper.runOnLocalCluster(TOPOLOGY_NAME, getTopology(topoConf));
-
- Helper.setupShutdownHook(cluster, TOPOLOGY_NAME);
- while (true) {// run indefinitely till Ctrl-C
- Thread.sleep(20_000_000);
- }
- } else {
- // Submit to Storm cluster
- if (args.length !=2) {
- System.err.println("args: runDurationSec confFile");
- return;
- }
- Integer durationSec = Integer.parseInt(args[0]);
- Map topoConf = Utils.findAndReadConfigFile(args[1]);
-
- Helper.runOnClusterAndPrintMetrics(durationSec, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
+ String confFile = "conf/HdfsSpoutTopo.yaml";
+ int runTime = -1; //Run until Ctrl-C
+ if (args.length > 0) {
+ runTime = Integer.parseInt(args[0]);
+ }
+
+ if (args.length > 1) {
+ confFile = args[1];
+ }
+
+ // Submit to Storm cluster
+ if (args.length > 2) {
+ System.err.println("args: [runDurationSec] [confFile]");
+ return;
}
+
+ Map<String, Object> topoConf = Utils.findAndReadConfigFile(confFile);
+
+ Helper.runOnClusterAndPrintMetrics(runTime, TOPOLOGY_NAME, topoConf, getTopology(topoConf));
}
public static class LineWriter implements RecordFormat {
+ private static final long serialVersionUID = 7524288317405514146L;
private String lineDelimiter = System.lineSeparator();
private String fieldName;
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
index e01de0e..a008888 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/BasicMetricsCollector.java
@@ -18,7 +18,6 @@
package org.apache.storm.perf.utils;
-import org.apache.storm.LocalCluster;
import org.apache.storm.generated.Nimbus;
import org.apache.storm.utils.Utils;
import org.apache.log4j.Logger;
@@ -27,10 +26,7 @@ import java.io.PrintWriter;
import java.util.*;
-public class BasicMetricsCollector {
-
- private LocalCluster localCluster = null;
- private Nimbus.Client client = null;
+public class BasicMetricsCollector implements AutoCloseable {
private PrintWriter dataWriter;
private long startTime=0;
@@ -80,20 +76,8 @@ public class BasicMetricsCollector {
private double maxLatency = 0;
boolean first = true;
-
- public BasicMetricsCollector(Nimbus.Client client, String topoName, Map stormConfig) {
- this(topoName, stormConfig);
- this.client = client;
- this.localCluster = null;
- }
-
- public BasicMetricsCollector(LocalCluster localCluster, String topoName, Map stormConfig) {
- this(topoName, stormConfig);
- this.client = null;
- this.localCluster = localCluster;
- }
-
- private BasicMetricsCollector(String topoName, Map stormConfig) {
+
+ public BasicMetricsCollector(String topoName, Map<String, Object> stormConfig) {
Set<MetricsItem> items = getMetricsToCollect();
this.config = new MetricsCollectorConfig(topoName, stormConfig);
collectTopologyStats = collectTopologyStats(items);
@@ -104,14 +88,13 @@ public class BasicMetricsCollector {
dataWriter = new PrintWriter(System.err);
}
-
private Set<MetricsItem> getMetricsToCollect() {
Set<MetricsItem> result = new HashSet<>();
result.add(MetricsItem.ALL);
return result;
}
- public void collect(Nimbus.Client client) {
+ public void collect(Nimbus.Iface client) {
try {
if (!first) {
this.lastSample = this.curSample;
@@ -130,25 +113,7 @@ public class BasicMetricsCollector {
}
}
- public void collect(LocalCluster localCluster) {
- try {
- if (!first) {
- this.lastSample = this.curSample;
- this.curSample = MetricsSample.factory(localCluster, config.name);
- updateStats(dataWriter);
- writeLine(dataWriter);
- } else {
- LOG.info("Getting baseline metrics sample.");
- writeHeader(dataWriter);
- this.curSample = MetricsSample.factory(localCluster, config.name);
- first = false;
- startTime = System.currentTimeMillis();
- }
- } catch (Exception e) {
- LOG.error("storm metrics failed! ", e);
- }
- }
-
+ @Override
public void close() {
dataWriter.close();
}
@@ -287,13 +252,13 @@ public class BasicMetricsCollector {
private static final Logger LOG = Logger.getLogger(MetricsCollectorConfig.class);
// storm configuration
- public final Map stormConfig;
+ public final Map<String, Object> stormConfig;
// storm topology name
public final String name;
// benchmark label
public final String label;
- public MetricsCollectorConfig(String topoName, Map stormConfig) {
+ public MetricsCollectorConfig(String topoName, Map<String, Object> stormConfig) {
this.stormConfig = stormConfig;
String labelStr = (String) stormConfig.get("benchmark.label");
this.name = topoName;
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
index 465ff33..d26078a 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/Helper.java
@@ -18,39 +18,23 @@
package org.apache.storm.perf.utils;
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
+import java.util.Map;
+
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.KillOptions;
import org.apache.storm.generated.Nimbus;
import org.apache.storm.generated.StormTopology;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.NimbusClient;
-
-import java.util.Map;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Utils;
public class Helper {
- public static void kill(Nimbus.Client client, String topoName) throws Exception {
- KillOptions opts = new KillOptions();
- opts.set_wait_secs(0);
- client.killTopologyWithOpts(topoName, opts);
- }
-
- public static void killAndShutdownCluster(LocalCluster cluster, String topoName) throws Exception {
- KillOptions opts = new KillOptions();
- opts.set_wait_secs(0);
- cluster.killTopologyWithOpts(topoName, opts);
- cluster.shutdown();
- }
-
-
- public static LocalCluster runOnLocalCluster(String topoName, StormTopology topology) throws Exception {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology(topoName, new Config(), topology);
- return cluster;
+ public static void kill(Nimbus.Iface client, String topoName) throws Exception {
+ KillOptions opts = new KillOptions();
+ opts.set_wait_secs(0);
+ client.killTopologyWithOpts(topoName, opts);
}
public static int getInt(Map map, Object key, int def) {
@@ -61,72 +45,52 @@ public class Helper {
return (String) map.get(key);
}
- public static void collectMetricsAndKill(String topologyName, Integer pollInterval, Integer duration) throws Exception {
- Map clusterConf = Utils.readStormConfig();
- Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
- BasicMetricsCollector metricsCollector = new BasicMetricsCollector(client, topologyName, clusterConf);
-
- int times = duration / pollInterval;
- metricsCollector.collect(client);
- for (int i = 0; i < times; i++) {
- Thread.sleep(pollInterval * 1000);
- metricsCollector.collect(client);
+ public static void collectMetricsAndKill(String topologyName, Integer pollInterval, int duration) throws Exception {
+ Map<String, Object> clusterConf = Utils.readStormConfig();
+ Nimbus.Iface client = NimbusClient.getConfiguredClient(clusterConf).getClient();
+ try (BasicMetricsCollector metricsCollector = new BasicMetricsCollector(topologyName, clusterConf)) {
+
+ if (duration > 0) {
+ int times = duration / pollInterval;
+ metricsCollector.collect(client);
+ for (int i = 0; i < times; i++) {
+ Thread.sleep(pollInterval * 1000);
+ metricsCollector.collect(client);
+ }
+ } else {
+ while (true) { //until Ctrl-C
+ metricsCollector.collect(client);
+ Thread.sleep(pollInterval * 1000);
+ }
+ }
+ } finally {
+ kill(client, topologyName);
}
- metricsCollector.close();
- kill(client, topologyName);
}
- public static void collectLocalMetricsAndKill(LocalCluster localCluster, String topologyName, Integer pollInterval, Integer duration, Map clusterConf) throws Exception {
- BasicMetricsCollector metricsCollector = new BasicMetricsCollector(localCluster, topologyName, clusterConf);
-
- int times = duration / pollInterval;
- metricsCollector.collect(localCluster);
- for (int i = 0; i < times; i++) {
- Thread.sleep(pollInterval * 1000);
- metricsCollector.collect(localCluster);
- }
- metricsCollector.close();
- killAndShutdownCluster(localCluster, topologyName);
+ /** Kill topo on Ctrl-C */
+ public static void setupShutdownHook(final String topoName) {
+ Map clusterConf = Utils.readStormConfig();
+ final Nimbus.Iface client = NimbusClient.getConfiguredClient(clusterConf).getClient();
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ try {
+ Helper.kill(client, topoName);
+ System.out.println("Killed Topology");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
}
- /** Kill topo and Shutdown local cluster on Ctrl-C */
- public static void setupShutdownHook(final LocalCluster cluster, final String topoName) {
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() {
- try {
- cluster.killTopology(topoName);
- System.out.println("Killed Topology");
- } catch (Exception e) {
- System.err.println("Encountered error in killing topology: " + e);
- }
- cluster.shutdown();
- }
- });
- }
-
- /** Kill topo on Ctrl-C */
- public static void setupShutdownHook(final String topoName) {
- Map clusterConf = Utils.readStormConfig();
- final Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() {
- try {
- Helper.kill(client, topoName);
- System.out.println("Killed Topology");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
-
- public static void runOnClusterAndPrintMetrics(Integer durationSec, String topoName, Map topoConf, StormTopology topology) throws Exception {
- // submit topology
- StormSubmitter.submitTopologyWithProgressBar(topoName, topoConf, topology);
- setupShutdownHook(topoName); // handle Ctrl-C
+ public static void runOnClusterAndPrintMetrics(int durationSec, String topoName, Map topoConf, StormTopology topology) throws Exception {
+ // submit topology
+ StormSubmitter.submitTopologyWithProgressBar(topoName, topoConf, topology);
+ setupShutdownHook(topoName); // handle Ctrl-C
- // poll metrics every minute, then kill topology after specified duration
- Integer pollIntervalSec = 60;
- collectMetricsAndKill(topoName, pollIntervalSec, durationSec);
+ // poll metrics every minute, then kill topology after specified duration
+ Integer pollIntervalSec = 60;
+ collectMetricsAndKill(topoName, pollIntervalSec, durationSec);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
index a934120..fbcea13 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
@@ -18,7 +18,9 @@
package org.apache.storm.perf.utils;
-import org.apache.storm.LocalCluster;
+import java.util.List;
+import java.util.Map;
+
import org.apache.storm.generated.ClusterSummary;
import org.apache.storm.generated.ExecutorSpecificStats;
import org.apache.storm.generated.ExecutorStats;
@@ -29,9 +31,6 @@ import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.generated.TopologySummary;
import org.apache.storm.utils.Utils;
-import java.util.List;
-import java.util.Map;
-
public class MetricsSample {
private long sampleTime = -1;
@@ -54,7 +53,7 @@ public class MetricsSample {
private int totalSlots = 0;
private int usedSlots = 0;
- public static MetricsSample factory(Nimbus.Client client, String topologyName) throws Exception {
+ public static MetricsSample factory(Nimbus.Iface client, String topologyName) throws Exception {
// "************ Sampling Metrics *****************
ClusterSummary clusterSummary = client.getClusterInfo();
@@ -72,12 +71,6 @@ public class MetricsSample {
return sample;
}
- public static MetricsSample factory(LocalCluster localCluster, String topologyName) throws Exception {
- TopologyInfo topologyInfo = localCluster.getTopologyInfo(topologyName);;
- return getMetricsSample(topologyInfo);
- }
-
-
private static MetricsSample getMetricsSample(TopologyInfo topInfo) {
List<ExecutorSummary> executorSummaries = topInfo.get_executors();
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-pmml-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-pmml-examples/pom.xml b/examples/storm-pmml-examples/pom.xml
index d202076..a1699d2 100644
--- a/examples/storm-pmml-examples/pom.xml
+++ b/examples/storm-pmml-examples/pom.xml
@@ -34,7 +34,7 @@
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
- <artifactId>storm-server</artifactId>
+ <artifactId>storm-client</artifactId>
<version>${project.version}</version>
<scope>${provided.scope}</scope>
</dependency>
@@ -76,4 +76,4 @@
</plugin>
</plugins>
</build>
-</project>
\ No newline at end of file
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java b/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java
index 1e5521e..c330f98 100644
--- a/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java
+++ b/examples/storm-pmml-examples/src/main/java/org/apache/storm/pmml/JpmmlRunnerTestTopology.java
@@ -18,11 +18,15 @@
package org.apache.storm.pmml;
-import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.pmml.model.ModelOutputs;
@@ -36,14 +40,7 @@ import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.Utils;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.Arrays;
-import java.util.List;
+import com.google.common.collect.Lists;
/**
* Topology that loads a PMML Model and raw input data from a CSV file. The {@link RawInputFromCSVSpout}
@@ -66,8 +63,7 @@ public class JpmmlRunnerTestTopology {
private File rawInputs; // Raw input data to be scored (predicted)
private File pmml; // PMML Model read from file - null if using Blobstore
private String blobKey; // PMML Model downloaded from Blobstore - null if using File
- private boolean isLocal;
- private String tplgyName;
+ private String tplgyName = "test";
public static void main(String[] args) throws Exception {
try {
@@ -89,7 +85,6 @@ public class JpmmlRunnerTestTopology {
printUsage();
} else {
try {
- isLocal = true;
for (int i = 0; i < args.length; ) {
switch (args[i]) {
case "-f":
@@ -106,7 +101,6 @@ public class JpmmlRunnerTestTopology {
break;
default:
tplgyName = args[i];
- isLocal = false;
i++;
break;
}
@@ -155,11 +149,7 @@ public class JpmmlRunnerTestTopology {
private void run() throws Exception {
System.out.println(String.format("Running topology using PMML model loaded from [%s] and raw input data loaded from [%s]",
blobKey != null ? "Blobstore with blob key [" + blobKey + "]" : pmml.getAbsolutePath(), rawInputs.getAbsolutePath()));
- if (isLocal) {
- submitTopologyLocalCluster(newTopology(), newConfig());
- } else {
- submitTopologyRemoteCluster(newTopology(), newConfig());
- }
+ submitTopologyRemoteCluster(newTopology(), newConfig());
}
private StormTopology newTopology() throws Exception {
@@ -171,26 +161,10 @@ public class JpmmlRunnerTestTopology {
return builder.createTopology();
}
- private void submitTopologyLocalCluster(StormTopology topology, Config config) throws Exception {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology(tplgyName, config, topology);
- stopWaitingForInput();
- }
-
private void submitTopologyRemoteCluster(StormTopology topology, Config config) throws Exception {
StormSubmitter.submitTopology(tplgyName, config, topology);
}
- private void stopWaitingForInput() {
- try {
- System.out.println("PRESS ENTER TO STOP");
- new BufferedReader(new InputStreamReader(System.in)).readLine();
- System.exit(0);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
private Config newConfig() {
Config config = new Config();
config.setDebug(true);
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-redis-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/pom.xml b/examples/storm-redis-examples/pom.xml
index 9317daf..d03d8b7 100644
--- a/examples/storm-redis-examples/pom.xml
+++ b/examples/storm-redis-examples/pom.xml
@@ -30,7 +30,7 @@
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
- <artifactId>storm-server</artifactId>
+ <artifactId>storm-client</artifactId>
<version>${project.version}</version>
<scope>${provided.scope}</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/LookupWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/LookupWordCount.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/LookupWordCount.java
index 17a088b..43798d0 100644
--- a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/LookupWordCount.java
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/LookupWordCount.java
@@ -17,10 +17,16 @@
*/
package org.apache.storm.redis.topology;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
+import org.apache.storm.redis.bolt.RedisLookupBolt;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -30,18 +36,10 @@ import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.ITuple;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-import com.google.common.collect.Lists;
-import org.apache.storm.redis.bolt.RedisLookupBolt;
-import org.apache.storm.redis.common.config.JedisPoolConfig;
-import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
-import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
+import com.google.common.collect.Lists;
public class LookupWordCount {
private static final String WORD_SPOUT = "WORD_SPOUT";
@@ -109,17 +107,14 @@ public class LookupWordCount {
builder.setBolt(LOOKUP_BOLT, lookupBolt, 1).shuffleGrouping(WORD_SPOUT);
builder.setBolt(PRINT_BOLT, printBolt, 1).shuffleGrouping(LOOKUP_BOLT);
- if (args.length == 2) {
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("test", config, builder.createTopology());) {
- Thread.sleep(30000);
- }
- System.exit(0);
- } else if (args.length == 3) {
- StormSubmitter.submitTopology(args[2], config, builder.createTopology());
- } else{
+ String topoName = "test";
+ if (args.length == 3) {
+ topoName = args[2];
+ } else if (args.length > 3) {
System.out.println("Usage: LookupWordCount <redis host> <redis port> (topology name)");
+ return;
}
+ StormSubmitter.submitTopology(topoName, config, builder.createTopology());
}
private static RedisLookupMapper setupLookupMapper() {
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/PersistentWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/PersistentWordCount.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/PersistentWordCount.java
index 36cec89..4c7b0dc 100644
--- a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/PersistentWordCount.java
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/PersistentWordCount.java
@@ -18,25 +18,14 @@
package org.apache.storm.redis.topology;
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.ITuple;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.redis.bolt.AbstractRedisBolt;
import org.apache.storm.redis.bolt.RedisStoreBolt;
-import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import redis.clients.jedis.JedisCommands;
-import redis.clients.jedis.exceptions.JedisConnectionException;
-import redis.clients.jedis.exceptions.JedisException;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
public class PersistentWordCount {
private static final String WORD_SPOUT = "WORD_SPOUT";
@@ -72,17 +61,14 @@ public class PersistentWordCount {
builder.setBolt(COUNT_BOLT, bolt, 1).fieldsGrouping(WORD_SPOUT, new Fields("word"));
builder.setBolt(STORE_BOLT, storeBolt, 1).shuffleGrouping(COUNT_BOLT);
- if (args.length == 2) {
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("test", config, builder.createTopology());) {
- Thread.sleep(30000);
- }
- System.exit(0);
- } else if (args.length == 3) {
- StormSubmitter.submitTopology(args[2], config, builder.createTopology());
- } else {
+ String topoName = "test";
+ if (args.length == 3) {
+ topoName = args[2];
+ } else if (args.length > 3) {
System.out.println("Usage: PersistentWordCount <redis host> <redis port> (topology name)");
+ return;
}
+ StormSubmitter.submitTopology(topoName, config, builder.createTopology());
}
private static RedisStoreMapper setupStoreMapper() {
http://git-wip-us.apache.org/repos/asf/storm/blob/b254ede4/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WhitelistWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WhitelistWordCount.java b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WhitelistWordCount.java
index 1cccd9b..4ac410a 100644
--- a/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WhitelistWordCount.java
+++ b/examples/storm-redis-examples/src/main/java/org/apache/storm/redis/topology/WhitelistWordCount.java
@@ -17,9 +17,10 @@
*/
package org.apache.storm.redis.topology;
+import java.util.Map;
+import java.util.Random;
+
import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
import org.apache.storm.StormSubmitter;
import org.apache.storm.redis.bolt.RedisFilterBolt;
import org.apache.storm.redis.common.config.JedisPoolConfig;
@@ -36,9 +37,6 @@ import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-import java.util.Random;
-
public class WhitelistWordCount {
private static final String WORD_SPOUT = "WORD_SPOUT";
private static final String WHITELIST_BOLT = "WHITELIST_BOLT";
@@ -106,17 +104,14 @@ public class WhitelistWordCount {
builder.setBolt(COUNT_BOLT, wordCounterBolt, 1).fieldsGrouping(WHITELIST_BOLT, new Fields("word"));
builder.setBolt(PRINT_BOLT, printBolt, 1).shuffleGrouping(COUNT_BOLT);
- if (args.length == 2) {
- try (LocalCluster cluster = new LocalCluster();
- LocalTopology topo = cluster.submitTopology("test", config, builder.createTopology());) {
- Thread.sleep(30000);
- }
- System.exit(0);
- } else if (args.length == 3) {
- StormSubmitter.submitTopology(args[2], config, builder.createTopology());
- } else{
- System.out.println("Usage: WhitelistWordCount <redis host> <redis port> (topology name)");
+ String topoName = "test";
+ if (args.length == 3) {
+ topoName = args[2];
+ } else if (args.length > 3) {
+ System.out.println("Usage: WhitelistWordCount <redis host> <redis port> [topology name]");
+ return;
}
+ StormSubmitter.submitTopology(topoName, config, builder.createTopology());
}
private static RedisFilterMapper setupWhitelistMapper() {