You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2017/02/01 23:43:06 UTC

[1/3] storm git commit: STORM-2320: DRPC client printer class reusable for local and remote DRPC

Repository: storm
Updated Branches:
  refs/heads/master 251cb8876 -> 1064ed107


STORM-2320: DRPC client printer class reusable for local and remote DRPC

  - Client necessary to check for DRPC results while running in distribute mode


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c23d93c5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c23d93c5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c23d93c5

Branch: refs/heads/master
Commit: c23d93c5ebda38b66dad1177f49a6e97fb2f1957
Parents: 11db3ea
Author: Hugo Louro <hm...@gmail.com>
Authored: Fri Dec 23 17:09:06 2016 -0800
Committer: Hugo Louro <hm...@gmail.com>
Committed: Mon Jan 23 18:29:12 2017 -0800

----------------------------------------------------------------------
 .../TridentKafkaClientWordCountNamedTopics.java |  9 ++-
 .../storm/kafka/trident/DrpcResultsPrinter.java | 85 ++++++++++++++++++++
 .../trident/TridentKafkaConsumerTopology.java   | 13 ++-
 .../kafka/trident/TridentKafkaWordCount.java    |  8 +-
 4 files changed, 109 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c23d93c5/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
index 2482eae..1bdfe65 100644
--- a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
+++ b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
@@ -105,7 +105,7 @@ public class TridentKafkaClientWordCountNamedTopics {
         new TridentKafkaClientWordCountNamedTopics().run(args);
     }
 
-    protected void run(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+    protected void run(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, InterruptedException {
         if (args.length > 0 && Arrays.stream(args).anyMatch(option -> option.equals("-h"))) {
             System.out.printf("Usage: java %s [%s] [%s] [%s] [%s]\n", getClass().getName(),
                     "broker_host:broker_port", "topic1", "topic2", "topology_name");
@@ -124,6 +124,11 @@ public class TridentKafkaClientWordCountNamedTopics {
                 StormSubmitter.submitTopology(topic2 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, topic2));
                 // Consumer
                 StormSubmitter.submitTopology("topics-consumer", tpConf, TridentKafkaConsumerTopology.newTopology(newKafkaTridentSpoutOpaque()));
+
+                // Print results to console, which also causes the print filter in the consumer topology to print the results in the worker log
+                Thread.sleep(2000);
+                DrpcResultsPrinter.remoteClient().printResults(60, 1, TimeUnit.SECONDS);
+
             } else { //Submit Local
 
                 final LocalSubmitter localSubmitter = LocalSubmitter.newInstance();
@@ -140,7 +145,7 @@ public class TridentKafkaClientWordCountNamedTopics {
                             localSubmitter.getDrpc(), newKafkaTridentSpoutOpaque()));
 
                     // print
-                    localSubmitter.printResults(15, 1, TimeUnit.SECONDS);
+                    new DrpcResultsPrinter(localSubmitter.getDrpc()).printResults(60, 1, TimeUnit.SECONDS);
                 } finally {
                     // kill
                     localSubmitter.kill(topic1Tp);

http://git-wip-us.apache.org/repos/asf/storm/blob/c23d93c5/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java
new file mode 100644
index 0000000..f71e2df
--- /dev/null
+++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.LocalDRPC;
+import org.apache.storm.generated.DistributedRPC;
+import org.apache.storm.thrift.transport.TTransportException;
+import org.apache.storm.utils.DRPCClient;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class DrpcResultsPrinter {
+    private static final Logger LOG = LoggerFactory.getLogger(DrpcResultsPrinter.class);
+
+    private final DistributedRPC.Iface drpcClient;
+
+    public DrpcResultsPrinter(DistributedRPC.Iface drpcClient) {
+        this.drpcClient = drpcClient;
+    }
+
+    /**
+     * @return local DRPC client running on the same JVML
+     */
+    public static DrpcResultsPrinter localClient() {
+        return new DrpcResultsPrinter(new LocalDRPC());
+    }
+
+    /**
+     * @return remote DRPC client running on local host, on port 3772, with defaults.yaml config
+     */
+    public static DrpcResultsPrinter remoteClient() {
+        return remoteClient(Utils.readDefaultConfig(), "localhost", 3772);
+    }
+
+    /**
+     * @return remote DRPC client running on the specified host, port, with the provided config
+     */
+    public static DrpcResultsPrinter remoteClient(Map<String, Object> config, String host, int port) {
+        try {
+            return new DrpcResultsPrinter(new DRPCClient(config, host,port));
+        } catch (TTransportException e) {
+            throw new RuntimeException(String.format("DRPC Client failed to connect to DRPC server. " +
+                    "[host = %s], [port = %s], [config = %s]", host, port, config));
+        }
+    }
+
+    /**
+     * Prints the DRPC results for the number of times specified, sleeping the specified time in between prints
+     */
+    public void printResults(int num, int sleepTime, TimeUnit sleepUnit) {
+        for (int i = 0; i < num; i++) {
+            try {
+                LOG.info("--- DRPC RESULT: " + drpcClient.execute("words", "the and apple snow jumped"));
+                System.out.println();
+                Thread.sleep(sleepUnit.toMillis(sleepTime));
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public static void main(String[] args) {
+        DrpcResultsPrinter.remoteClient().printResults(60, 1, TimeUnit.SECONDS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c23d93c5/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
index b59f973..4669f52 100644
--- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
+++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
@@ -26,6 +26,7 @@ import org.apache.storm.starter.trident.DebugMemoryMapState;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentState;
 import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFilter;
 import org.apache.storm.trident.operation.builtin.Count;
 import org.apache.storm.trident.operation.builtin.Debug;
 import org.apache.storm.trident.operation.builtin.FilterNull;
@@ -33,6 +34,7 @@ import org.apache.storm.trident.operation.builtin.MapGet;
 import org.apache.storm.trident.spout.ITridentDataSource;
 import org.apache.storm.trident.testing.MemoryMapState;
 import org.apache.storm.trident.testing.Split;
+import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.tuple.Fields;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,13 +68,20 @@ public class TridentKafkaConsumerTopology {
         return tridentTopology.build();
     }
 
-    private static Stream addDRPCStream(TridentTopology tridentTopology, TridentState state, LocalDRPC drpc) {
+    private static Stream addDRPCStream(TridentTopology tridentTopology, final TridentState state, LocalDRPC drpc) {
         return tridentTopology.newDRPCStream("words", drpc)
                 .each(new Fields("args"), new Split(), new Fields("word"))
                 .groupBy(new Fields("word"))
                 .stateQuery(state, new Fields("word"), new MapGet(), new Fields("count"))
                 .each(new Fields("count"), new FilterNull())
-                .project(new Fields("word", "count"));
+                .project(new Fields("word", "count"))
+                .filter(new BaseFilter() {
+                    @Override
+                    public boolean isKeep(TridentTuple tuple) {
+                        LOG.debug("DRPC RESULT: " + tuple);  // Used to show the DRPC results in the worker log. Useful for debugging
+                        return true;
+                    }
+                });
     }
 
     private static TridentState addTridentState(TridentTopology tridentTopology, ITridentDataSource tridentSpout) {

http://git-wip-us.apache.org/repos/asf/storm/blob/c23d93c5/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java
index 0247cce..84dc380 100644
--- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java
+++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java
@@ -85,6 +85,10 @@ public class TridentKafkaWordCount implements Serializable {
             // Consumer
             StormSubmitter.submitTopology(args[2] + "-consumer", tpConf, TridentKafkaConsumerTopology.newTopology(
                     new TransactionalTridentKafkaSpout(newTridentKafkaConfig(zkBrokerUrl[0]))));
+
+            // Print results to console, which also causes the print filter in the consumer topology to print the results in the worker log
+            Thread.sleep(2000);
+            DrpcResultsPrinter.remoteClient().printResults(60, 1, TimeUnit.SECONDS);
         } else { //Submit Local
             final LocalSubmitter localSubmitter = LocalSubmitter.newInstance();
             final String prodTpName = "kafkaBolt";
@@ -98,7 +102,7 @@ public class TridentKafkaWordCount implements Serializable {
                         new TransactionalTridentKafkaSpout(newTridentKafkaConfig(zkBrokerUrl[0]))));
 
                 // print
-                localSubmitter.printResults(60, 1, TimeUnit.SECONDS);
+                new DrpcResultsPrinter(localSubmitter.getDrpc()).printResults(60, 1, TimeUnit.SECONDS);
             } finally {
                 // kill
                 localSubmitter.kill(prodTpName);
@@ -133,7 +137,7 @@ public class TridentKafkaWordCount implements Serializable {
         TridentKafkaConfig config = new TridentKafkaConfig(hosts, "test");
         config.scheme = new SchemeAsMultiScheme(new StringScheme());
 
-        // Consume new data from the topic��
+        // Consume new data from the topic
         config.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
         return config;
     }


[3/3] storm git commit: Added STORM-2320 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-2320 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1064ed10
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1064ed10
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1064ed10

Branch: refs/heads/master
Commit: 1064ed10740091fb38b884b2ded29dcceed3318f
Parents: 5ccef13
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Wed Feb 1 15:25:29 2017 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Wed Feb 1 15:25:29 2017 -0800

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1064ed10/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e2f7cdd..242a247 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -195,6 +195,7 @@
  * STORM-1769: Added a test to check local nimbus with notifier plugin
 
 ## 1.1.0
+ * STORM-2320:  DRPC client printer class reusable for local and remote DRPC.
  * STORM-2225: change spout config to be simpler.
  * STORM-2330: Fix storm sql code generation for UDAF with non standard sql types
  * STORM-2298: Don't kill Nimbus when ClusterMetricsConsumer is failed to initialize


[2/3] storm git commit: Merge branch 'Apache_STORM-master_PrintRemoteDRPC' of https://github.com/hmcl/storm-apache into STORM-2320

Posted by sr...@apache.org.
Merge branch 'Apache_STORM-master_PrintRemoteDRPC' of https://github.com/hmcl/storm-apache into STORM-2320


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5ccef139
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5ccef139
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5ccef139

Branch: refs/heads/master
Commit: 5ccef1393de96ac3e87aaca8abc0b31a5c38457f
Parents: 251cb88 c23d93c
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Wed Feb 1 15:24:30 2017 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Wed Feb 1 15:24:30 2017 -0800

----------------------------------------------------------------------
 .../TridentKafkaClientWordCountNamedTopics.java |  9 ++-
 .../storm/kafka/trident/DrpcResultsPrinter.java | 85 ++++++++++++++++++++
 .../trident/TridentKafkaConsumerTopology.java   | 13 ++-
 .../kafka/trident/TridentKafkaWordCount.java    |  8 +-
 4 files changed, 109 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5ccef139/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
----------------------------------------------------------------------