You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2017/02/01 23:43:06 UTC
[1/3] storm git commit: STORM-2320: DRPC client printer class
reusable for local and remote DRPC
Repository: storm
Updated Branches:
refs/heads/master 251cb8876 -> 1064ed107
STORM-2320: DRPC client printer class reusable for local and remote DRPC
- Client necessary to check for DRPC results while running in distribute mode
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c23d93c5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c23d93c5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c23d93c5
Branch: refs/heads/master
Commit: c23d93c5ebda38b66dad1177f49a6e97fb2f1957
Parents: 11db3ea
Author: Hugo Louro <hm...@gmail.com>
Authored: Fri Dec 23 17:09:06 2016 -0800
Committer: Hugo Louro <hm...@gmail.com>
Committed: Mon Jan 23 18:29:12 2017 -0800
----------------------------------------------------------------------
.../TridentKafkaClientWordCountNamedTopics.java | 9 ++-
.../storm/kafka/trident/DrpcResultsPrinter.java | 85 ++++++++++++++++++++
.../trident/TridentKafkaConsumerTopology.java | 13 ++-
.../kafka/trident/TridentKafkaWordCount.java | 8 +-
4 files changed, 109 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c23d93c5/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
index 2482eae..1bdfe65 100644
--- a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
+++ b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
@@ -105,7 +105,7 @@ public class TridentKafkaClientWordCountNamedTopics {
new TridentKafkaClientWordCountNamedTopics().run(args);
}
- protected void run(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+ protected void run(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, InterruptedException {
if (args.length > 0 && Arrays.stream(args).anyMatch(option -> option.equals("-h"))) {
System.out.printf("Usage: java %s [%s] [%s] [%s] [%s]\n", getClass().getName(),
"broker_host:broker_port", "topic1", "topic2", "topology_name");
@@ -124,6 +124,11 @@ public class TridentKafkaClientWordCountNamedTopics {
StormSubmitter.submitTopology(topic2 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, topic2));
// Consumer
StormSubmitter.submitTopology("topics-consumer", tpConf, TridentKafkaConsumerTopology.newTopology(newKafkaTridentSpoutOpaque()));
+
+ // Print results to console, which also causes the print filter in the consumer topology to print the results in the worker log
+ Thread.sleep(2000);
+ DrpcResultsPrinter.remoteClient().printResults(60, 1, TimeUnit.SECONDS);
+
} else { //Submit Local
final LocalSubmitter localSubmitter = LocalSubmitter.newInstance();
@@ -140,7 +145,7 @@ public class TridentKafkaClientWordCountNamedTopics {
localSubmitter.getDrpc(), newKafkaTridentSpoutOpaque()));
// print
- localSubmitter.printResults(15, 1, TimeUnit.SECONDS);
+ new DrpcResultsPrinter(localSubmitter.getDrpc()).printResults(60, 1, TimeUnit.SECONDS);
} finally {
// kill
localSubmitter.kill(topic1Tp);
http://git-wip-us.apache.org/repos/asf/storm/blob/c23d93c5/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java
new file mode 100644
index 0000000..f71e2df
--- /dev/null
+++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.LocalDRPC;
+import org.apache.storm.generated.DistributedRPC;
+import org.apache.storm.thrift.transport.TTransportException;
+import org.apache.storm.utils.DRPCClient;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class DrpcResultsPrinter {
+ private static final Logger LOG = LoggerFactory.getLogger(DrpcResultsPrinter.class);
+
+ private final DistributedRPC.Iface drpcClient;
+
+ public DrpcResultsPrinter(DistributedRPC.Iface drpcClient) {
+ this.drpcClient = drpcClient;
+ }
+
+ /**
+ * @return local DRPC client running on the same JVML
+ */
+ public static DrpcResultsPrinter localClient() {
+ return new DrpcResultsPrinter(new LocalDRPC());
+ }
+
+ /**
+ * @return remote DRPC client running on local host, on port 3772, with defaults.yaml config
+ */
+ public static DrpcResultsPrinter remoteClient() {
+ return remoteClient(Utils.readDefaultConfig(), "localhost", 3772);
+ }
+
+ /**
+ * @return remote DRPC client running on the specified host, port, with the provided config
+ */
+ public static DrpcResultsPrinter remoteClient(Map<String, Object> config, String host, int port) {
+ try {
+ return new DrpcResultsPrinter(new DRPCClient(config, host,port));
+ } catch (TTransportException e) {
+ throw new RuntimeException(String.format("DRPC Client failed to connect to DRPC server. " +
+ "[host = %s], [port = %s], [config = %s]", host, port, config));
+ }
+ }
+
+ /**
+ * Prints the DRPC results for the number of times specified, sleeping the specified time in between prints
+ */
+ public void printResults(int num, int sleepTime, TimeUnit sleepUnit) {
+ for (int i = 0; i < num; i++) {
+ try {
+ LOG.info("--- DRPC RESULT: " + drpcClient.execute("words", "the and apple snow jumped"));
+ System.out.println();
+ Thread.sleep(sleepUnit.toMillis(sleepTime));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static void main(String[] args) {
+ DrpcResultsPrinter.remoteClient().printResults(60, 1, TimeUnit.SECONDS);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c23d93c5/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
index b59f973..4669f52 100644
--- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
+++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
@@ -26,6 +26,7 @@ import org.apache.storm.starter.trident.DebugMemoryMapState;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFilter;
import org.apache.storm.trident.operation.builtin.Count;
import org.apache.storm.trident.operation.builtin.Debug;
import org.apache.storm.trident.operation.builtin.FilterNull;
@@ -33,6 +34,7 @@ import org.apache.storm.trident.operation.builtin.MapGet;
import org.apache.storm.trident.spout.ITridentDataSource;
import org.apache.storm.trident.testing.MemoryMapState;
import org.apache.storm.trident.testing.Split;
+import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,13 +68,20 @@ public class TridentKafkaConsumerTopology {
return tridentTopology.build();
}
- private static Stream addDRPCStream(TridentTopology tridentTopology, TridentState state, LocalDRPC drpc) {
+ private static Stream addDRPCStream(TridentTopology tridentTopology, final TridentState state, LocalDRPC drpc) {
return tridentTopology.newDRPCStream("words", drpc)
.each(new Fields("args"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.stateQuery(state, new Fields("word"), new MapGet(), new Fields("count"))
.each(new Fields("count"), new FilterNull())
- .project(new Fields("word", "count"));
+ .project(new Fields("word", "count"))
+ .filter(new BaseFilter() {
+ @Override
+ public boolean isKeep(TridentTuple tuple) {
+ LOG.debug("DRPC RESULT: " + tuple); // Used to show the DRPC results in the worker log. Useful for debugging
+ return true;
+ }
+ });
}
private static TridentState addTridentState(TridentTopology tridentTopology, ITridentDataSource tridentSpout) {
http://git-wip-us.apache.org/repos/asf/storm/blob/c23d93c5/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java
index 0247cce..84dc380 100644
--- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java
+++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java
@@ -85,6 +85,10 @@ public class TridentKafkaWordCount implements Serializable {
// Consumer
StormSubmitter.submitTopology(args[2] + "-consumer", tpConf, TridentKafkaConsumerTopology.newTopology(
new TransactionalTridentKafkaSpout(newTridentKafkaConfig(zkBrokerUrl[0]))));
+
+ // Print results to console, which also causes the print filter in the consumer topology to print the results in the worker log
+ Thread.sleep(2000);
+ DrpcResultsPrinter.remoteClient().printResults(60, 1, TimeUnit.SECONDS);
} else { //Submit Local
final LocalSubmitter localSubmitter = LocalSubmitter.newInstance();
final String prodTpName = "kafkaBolt";
@@ -98,7 +102,7 @@ public class TridentKafkaWordCount implements Serializable {
new TransactionalTridentKafkaSpout(newTridentKafkaConfig(zkBrokerUrl[0]))));
// print
- localSubmitter.printResults(60, 1, TimeUnit.SECONDS);
+ new DrpcResultsPrinter(localSubmitter.getDrpc()).printResults(60, 1, TimeUnit.SECONDS);
} finally {
// kill
localSubmitter.kill(prodTpName);
@@ -133,7 +137,7 @@ public class TridentKafkaWordCount implements Serializable {
TridentKafkaConfig config = new TridentKafkaConfig(hosts, "test");
config.scheme = new SchemeAsMultiScheme(new StringScheme());
- // Consume new data from the topic��
+ // Consume new data from the topic
config.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
return config;
}
[3/3] storm git commit: Added STORM-2320 to CHANGELOG.
Posted by sr...@apache.org.
Added STORM-2320 to CHANGELOG.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1064ed10
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1064ed10
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1064ed10
Branch: refs/heads/master
Commit: 1064ed10740091fb38b884b2ded29dcceed3318f
Parents: 5ccef13
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Wed Feb 1 15:25:29 2017 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Wed Feb 1 15:25:29 2017 -0800
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/1064ed10/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e2f7cdd..242a247 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -195,6 +195,7 @@
* STORM-1769: Added a test to check local nimbus with notifier plugin
## 1.1.0
+ * STORM-2320: DRPC client printer class reusable for local and remote DRPC.
* STORM-2225: change spout config to be simpler.
* STORM-2330: Fix storm sql code generation for UDAF with non standard sql types
* STORM-2298: Don't kill Nimbus when ClusterMetricsConsumer is failed to initialize
[2/3] storm git commit: Merge branch
'Apache_STORM-master_PrintRemoteDRPC' of https://github.com/hmcl/storm-apache
into STORM-2320
Posted by sr...@apache.org.
Merge branch 'Apache_STORM-master_PrintRemoteDRPC' of https://github.com/hmcl/storm-apache into STORM-2320
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5ccef139
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5ccef139
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5ccef139
Branch: refs/heads/master
Commit: 5ccef1393de96ac3e87aaca8abc0b31a5c38457f
Parents: 251cb88 c23d93c
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Wed Feb 1 15:24:30 2017 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Wed Feb 1 15:24:30 2017 -0800
----------------------------------------------------------------------
.../TridentKafkaClientWordCountNamedTopics.java | 9 ++-
.../storm/kafka/trident/DrpcResultsPrinter.java | 85 ++++++++++++++++++++
.../trident/TridentKafkaConsumerTopology.java | 13 ++-
.../kafka/trident/TridentKafkaWordCount.java | 8 +-
4 files changed, 109 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/5ccef139/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
----------------------------------------------------------------------