You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/06/04 04:06:06 UTC
[24/50] [abbrv] storm git commit: merge flux into external/flux/
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
index 0000000,0000000..9456d1b
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
@@@ -1,0 -1,0 +1,234 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux;
++
++import backtype.storm.Config;
++import backtype.storm.generated.StormTopology;
++import org.apache.storm.flux.model.ExecutionContext;
++import org.apache.storm.flux.model.TopologyDef;
++import org.apache.storm.flux.parser.FluxParser;
++import org.apache.storm.flux.test.TestBolt;
++import org.junit.Test;
++
++import java.io.File;
++
++import static org.junit.Assert.*;
++
++public class TCKTest {
++ @Test
++ public void testTCK() throws Exception {
++ TopologyDef topologyDef = FluxParser.parseResource("/configs/tck.yaml", false, true, null, false);
++ Config conf = FluxBuilder.buildConfig(topologyDef);
++ ExecutionContext context = new ExecutionContext(topologyDef, conf);
++ StormTopology topology = FluxBuilder.buildTopology(context);
++ assertNotNull(topology);
++ topology.validate();
++ }
++
++ @Test
++ public void testShellComponents() throws Exception {
++ TopologyDef topologyDef = FluxParser.parseResource("/configs/shell_test.yaml", false, true, null, false);
++ Config conf = FluxBuilder.buildConfig(topologyDef);
++ ExecutionContext context = new ExecutionContext(topologyDef, conf);
++ StormTopology topology = FluxBuilder.buildTopology(context);
++ assertNotNull(topology);
++ topology.validate();
++ }
++
++ @Test
++ public void testKafkaSpoutConfig() throws Exception {
++ TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null, false);
++ Config conf = FluxBuilder.buildConfig(topologyDef);
++ ExecutionContext context = new ExecutionContext(topologyDef, conf);
++ StormTopology topology = FluxBuilder.buildTopology(context);
++ assertNotNull(topology);
++ topology.validate();
++ }
++
++ @Test
++ public void testLoadFromResource() throws Exception {
++ TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_test.yaml", false, true, null, false);
++ Config conf = FluxBuilder.buildConfig(topologyDef);
++ ExecutionContext context = new ExecutionContext(topologyDef, conf);
++ StormTopology topology = FluxBuilder.buildTopology(context);
++ assertNotNull(topology);
++ topology.validate();
++ }
++
++
++ @Test
++ public void testHdfs() throws Exception {
++ TopologyDef topologyDef = FluxParser.parseResource("/configs/hdfs_test.yaml", false, true, null, false);
++ Config conf = FluxBuilder.buildConfig(topologyDef);
++ ExecutionContext context = new ExecutionContext(topologyDef, conf);
++ StormTopology topology = FluxBuilder.buildTopology(context);
++ assertNotNull(topology);
++ topology.validate();
++ }
++
++ @Test
++ public void testHbase() throws Exception {
++ TopologyDef topologyDef = FluxParser.parseResource("/configs/simple_hbase.yaml", false, true, null, false);
++ Config conf = FluxBuilder.buildConfig(topologyDef);
++ ExecutionContext context = new ExecutionContext(topologyDef, conf);
++ StormTopology topology = FluxBuilder.buildTopology(context);
++ assertNotNull(topology);
++ topology.validate();
++ }
++
++ @Test(expected = IllegalArgumentException.class)
++ public void testBadHbase() throws Exception {
++ TopologyDef topologyDef = FluxParser.parseResource("/configs/bad_hbase.yaml", false, true, null, false);
++ Config conf = FluxBuilder.buildConfig(topologyDef);
++ ExecutionContext context = new ExecutionContext(topologyDef, conf);
++ StormTopology topology = FluxBuilder.buildTopology(context);
++ assertNotNull(topology);
++ topology.validate();
++ }
++
++ @Test
++ public void testIncludes() throws Exception {
++ TopologyDef topologyDef = FluxParser.parseResource("/configs/include_test.yaml", false, true, null, false);
++ Config conf = FluxBuilder.buildConfig(topologyDef);
++ ExecutionContext context = new ExecutionContext(topologyDef, conf);
++ StormTopology topology = FluxBuilder.buildTopology(context);
++ assertNotNull(topology);
++ assertTrue(topologyDef.getName().equals("include-topology"));
++ assertTrue(topologyDef.getBolts().size() > 0);
++ assertTrue(topologyDef.getSpouts().size() > 0);
++ topology.validate();
++ }
++
++ @Test
++ public void testTopologySource() throws Exception {
++ TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology.yaml", false, true, null, false);
++ assertTrue(topologyDef.validate());
++ Config conf = FluxBuilder.buildConfig(topologyDef);
++ ExecutionContext context = new ExecutionContext(topologyDef, conf);
++ StormTopology topology = FluxBuilder.buildTopology(context);
++ assertNotNull(topology);
++ topology.validate();
++ }
++
++ @Test
++ public void testTopologySourceWithReflection() throws Exception {
++ TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true, null, false);
++ assertTrue(topologyDef.validate());
++ Config conf = FluxBuilder.buildConfig(topologyDef);
++ ExecutionContext context = new ExecutionContext(topologyDef, conf);
++ StormTopology topology = FluxBuilder.buildTopology(context);
++ assertNotNull(topology);
++ topology.validate();
++ }
++
++ @Test
++ public void testTopologySourceWithConfigParam() throws Exception {
++ TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection-config.yaml", false, true, null, false);
++ assertTrue(topologyDef.validate());
++ Config conf = FluxBuilder.buildConfig(topologyDef);
++ ExecutionContext context = new ExecutionContext(topologyDef, conf);
++ StormTopology topology = FluxBuilder.buildTopology(context);
++ assertNotNull(topology);
++ topology.validate();
++ }
++
++ @Test
++ public void testTopologySourceWithMethodName() throws Exception {
++ TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-method-override.yaml", false, true, null, false);
++ assertTrue(topologyDef.validate());
++ Config conf = FluxBuilder.buildConfig(topologyDef);
++ ExecutionContext context = new ExecutionContext(topologyDef, conf);
++ StormTopology topology = FluxBuilder.buildTopology(context);
++ assertNotNull(topology);
++ topology.validate();
++ }
++
++
++ @Test
++ public void testTridentTopologySource() throws Exception {
++ TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-trident.yaml", false, true, null, false);
++ assertTrue(topologyDef.validate());
++ Config conf = FluxBuilder.buildConfig(topologyDef);
++ ExecutionContext context = new ExecutionContext(topologyDef, conf);
++ StormTopology topology = FluxBuilder.buildTopology(context);
++ assertNotNull(topology);
++ topology.validate();
++ }
++
++ @Test(expected = IllegalArgumentException.class)
++ public void testInvalidTopologySource() throws Exception {
++ TopologyDef topologyDef = FluxParser.parseResource("/configs/invalid-existing-topology.yaml", false, true, null, false);
++ assertFalse("Topology config is invalid.", topologyDef.validate());
++ Config conf = FluxBuilder.buildConfig(topologyDef);
++ ExecutionContext context = new ExecutionContext(topologyDef, conf);
++ StormTopology topology = FluxBuilder.buildTopology(context);
++ }
++
++
++ @Test
++ public void testTopologySourceWithGetMethodName() throws Exception {
++ TopologyDef topologyDef = FluxParser.parseResource("/configs/existing-topology-reflection.yaml", false, true, null, false);
++ assertTrue(topologyDef.validate());
++ Config conf = FluxBuilder.buildConfig(topologyDef);
++ ExecutionContext context = new ExecutionContext(topologyDef, conf);
++ StormTopology topology = FluxBuilder.buildTopology(context);
++ assertNotNull(topology);
++ topology.validate();
++ }
++
++ @Test
++ public void testTopologySourceWithConfigMethods() throws Exception {
++ TopologyDef topologyDef = FluxParser.parseResource("/configs/config-methods-test.yaml", false, true, null, false);
++ assertTrue(topologyDef.validate());
++ Config conf = FluxBuilder.buildConfig(topologyDef);
++ ExecutionContext context = new ExecutionContext(topologyDef, conf);
++ StormTopology topology = FluxBuilder.buildTopology(context);
++ assertNotNull(topology);
++ topology.validate();
++
++ // make sure the property was actually set
++ TestBolt bolt = (TestBolt)context.getBolt("bolt-1");
++ assertTrue(bolt.getFoo().equals("foo"));
++ assertTrue(bolt.getBar().equals("bar"));
++ assertTrue(bolt.getFooBar().equals("foobar"));
++ }
++
++ @Test
++ public void testVariableSubstitution() throws Exception {
++ TopologyDef topologyDef = FluxParser.parseResource("/configs/substitution-test.yaml", false, true, "src/test/resources/configs/test.properties", true);
++ assertTrue(topologyDef.validate());
++ Config conf = FluxBuilder.buildConfig(topologyDef);
++ ExecutionContext context = new ExecutionContext(topologyDef, conf);
++ StormTopology topology = FluxBuilder.buildTopology(context);
++ assertNotNull(topology);
++ topology.validate();
++
++ // test basic substitution
++ assertEquals("Property not replaced.",
++ "substitution-topology",
++ context.getTopologyDef().getName());
++
++ // test environment variable substitution
++ // $PATH should be defined on most systems
++ String envPath = System.getenv().get("PATH");
++ assertEquals("ENV variable not replaced.",
++ envPath,
++ context.getTopologyDef().getConfig().get("test.env.value"));
++
++ }
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/java/org/apache/storm/flux/multilang/MultilangEnvirontmentTest.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/java/org/apache/storm/flux/multilang/MultilangEnvirontmentTest.java
index 0000000,0000000..dcded17
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/multilang/MultilangEnvirontmentTest.java
@@@ -1,0 -1,0 +1,89 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.multilang;
++
++
++import org.junit.Test;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import java.io.ByteArrayOutputStream;
++import java.io.InputStream;
++import java.io.OutputStream;
++
++import static org.junit.Assert.assertEquals;
++
++/**
++ * Sanity checks to make sure we can at least invoke the shells used.
++ */
++public class MultilangEnvirontmentTest {
++ private static final Logger LOG = LoggerFactory.getLogger(MultilangEnvirontmentTest.class);
++
++ @Test
++ public void testInvokePython() throws Exception {
++ String[] command = new String[]{"python", "--version"};
++ int exitVal = invokeCommand(command);
++ assertEquals("Exit value for python is 0.", 0, exitVal);
++ }
++
++ @Test
++ public void testInvokeNode() throws Exception {
++ String[] command = new String[]{"node", "--version"};
++ int exitVal = invokeCommand(command);
++ assertEquals("Exit value for node is 0.", 0, exitVal);
++ }
++
++ private static class StreamRedirect implements Runnable {
++ private InputStream in;
++ private OutputStream out;
++
++ public StreamRedirect(InputStream in, OutputStream out) {
++ this.in = in;
++ this.out = out;
++ }
++
++ @Override
++ public void run() {
++ try {
++ int i = -1;
++ while ((i = this.in.read()) != -1) {
++ out.write(i);
++ }
++ this.in.close();
++ this.out.close();
++ } catch (Exception e) {
++ e.printStackTrace();
++ }
++ }
++ }
++
++ private int invokeCommand(String[] args) throws Exception {
++ LOG.debug("Invoking command: {}", args);
++
++ ProcessBuilder pb = new ProcessBuilder(args);
++ pb.redirectErrorStream(true);
++ final Process proc = pb.start();
++
++ ByteArrayOutputStream out = new ByteArrayOutputStream();
++ Thread t = new Thread(new StreamRedirect(proc.getInputStream(), out));
++ t.start();
++ int exitVal = proc.waitFor();
++ LOG.debug("Command result: {}", out.toString());
++ return exitVal;
++ }
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopology.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopology.java
index 0000000,0000000..0d37997
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopology.java
@@@ -1,0 -1,0 +1,42 @@@
++package org.apache.storm.flux.test;
++
++import backtype.storm.generated.StormTopology;
++import backtype.storm.topology.TopologyBuilder;
++import org.apache.storm.flux.api.TopologySource;
++import org.apache.storm.flux.wrappers.bolts.LogInfoBolt;
++import org.apache.storm.flux.wrappers.spouts.FluxShellSpout;
++
++import java.util.Map;
++
++/**
++ * Test topology source that does not implement TopologySource, but has the same
++ * `getTopology()` method.
++ */
++public class SimpleTopology{
++
++
++ public SimpleTopology(){}
++
++ public SimpleTopology(String foo, String bar){}
++
++ public StormTopology getTopologyWithDifferentMethodName(Map<String, Object> config){
++ return getTopology(config);
++ }
++
++
++ public StormTopology getTopology(Map<String, Object> config) {
++ TopologyBuilder builder = new TopologyBuilder();
++
++ // spouts
++ FluxShellSpout spout = new FluxShellSpout(
++ new String[]{"node", "randomsentence.js"},
++ new String[]{"word"});
++ builder.setSpout("sentence-spout", spout, 1);
++
++ // bolts
++ builder.setBolt("log-bolt", new LogInfoBolt(), 1)
++ .shuffleGrouping("sentence-spout");
++
++ return builder.createTopology();
++ }
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologySource.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologySource.java
index 0000000,0000000..2007082
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologySource.java
@@@ -1,0 -1,0 +1,35 @@@
++package org.apache.storm.flux.test;
++
++import backtype.storm.generated.StormTopology;
++import backtype.storm.topology.TopologyBuilder;
++import org.apache.storm.flux.api.TopologySource;
++import org.apache.storm.flux.wrappers.bolts.LogInfoBolt;
++import org.apache.storm.flux.wrappers.spouts.FluxShellSpout;
++
++import java.util.Map;
++
++public class SimpleTopologySource implements TopologySource {
++
++
++ public SimpleTopologySource(){}
++
++ public SimpleTopologySource(String foo, String bar){}
++
++
++ @Override
++ public StormTopology getTopology(Map<String, Object> config) {
++ TopologyBuilder builder = new TopologyBuilder();
++
++ // spouts
++ FluxShellSpout spout = new FluxShellSpout(
++ new String[]{"node", "randomsentence.js"},
++ new String[]{"word"});
++ builder.setSpout("sentence-spout", spout, 1);
++
++ // bolts
++ builder.setBolt("log-bolt", new LogInfoBolt(), 1)
++ .shuffleGrouping("sentence-spout");
++
++ return builder.createTopology();
++ }
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologyWithConfigParam.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologyWithConfigParam.java
index 0000000,0000000..f29b543
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/SimpleTopologyWithConfigParam.java
@@@ -1,0 -1,0 +1,38 @@@
++package org.apache.storm.flux.test;
++
++import backtype.storm.Config;
++import backtype.storm.generated.StormTopology;
++import backtype.storm.topology.TopologyBuilder;
++import org.apache.storm.flux.wrappers.bolts.LogInfoBolt;
++import org.apache.storm.flux.wrappers.spouts.FluxShellSpout;
++
++import java.util.Map;
++
++/**
++ * Test topology source that does not implement TopologySource, but has the same
++ * `getTopology()` method.
++ */
++public class SimpleTopologyWithConfigParam {
++
++
++ public SimpleTopologyWithConfigParam(){}
++
++ public SimpleTopologyWithConfigParam(String foo, String bar){}
++
++
++ public StormTopology getTopology(Config config) {
++ TopologyBuilder builder = new TopologyBuilder();
++
++ // spouts
++ FluxShellSpout spout = new FluxShellSpout(
++ new String[]{"node", "randomsentence.js"},
++ new String[]{"word"});
++ builder.setSpout("sentence-spout", spout, 1);
++
++ // bolts
++ builder.setBolt("log-bolt", new LogInfoBolt(), 1)
++ .shuffleGrouping("sentence-spout");
++
++ return builder.createTopology();
++ }
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/TestBolt.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/java/org/apache/storm/flux/test/TestBolt.java
index 0000000,0000000..e88f2cf
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/TestBolt.java
@@@ -1,0 -1,0 +1,63 @@@
++package org.apache.storm.flux.test;
++
++import backtype.storm.topology.BasicOutputCollector;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.topology.base.BaseBasicBolt;
++import backtype.storm.tuple.Tuple;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++
++public class TestBolt extends BaseBasicBolt {
++ private static final Logger LOG = LoggerFactory.getLogger(TestBolt.class);
++
++ private String foo;
++ private String bar;
++ private String fooBar;
++
++ public static enum TestEnum {
++ FOO,
++ BAR
++ }
++
++ public TestBolt(TestEnum te){
++
++ }
++
++ public TestBolt(TestEnum te, float f){
++
++ }
++
++ @Override
++ public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
++ LOG.info("{}", tuple);
++ }
++
++ @Override
++ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
++
++ }
++
++ // config methods
++ public void withFoo(String foo){
++ this.foo = foo;
++ }
++ public void withBar(String bar){
++ this.bar = bar;
++ }
++
++ public void withFooBar(String foo, String bar){
++ this.fooBar = foo + bar;
++ }
++
++ public String getFoo(){
++ return this.foo;
++ }
++ public String getBar(){
++ return this.bar;
++ }
++
++ public String getFooBar(){
++ return this.fooBar;
++ }
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java
index 0000000,0000000..3cb6634
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java
@@@ -1,0 -1,0 +1,54 @@@
++package org.apache.storm.flux.test;
++
++import backtype.storm.Config;
++import backtype.storm.generated.StormTopology;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Values;
++import storm.kafka.StringScheme;
++import storm.trident.TridentTopology;
++import storm.trident.operation.BaseFunction;
++import storm.trident.operation.TridentCollector;
++import storm.trident.operation.builtin.Count;
++import storm.trident.testing.FixedBatchSpout;
++import storm.trident.testing.MemoryMapState;
++import storm.trident.tuple.TridentTuple;
++
++/**
++ * Basic Trident example that will return a `StormTopology` from a `getTopology()` method.
++ */
++public class TridentTopologySource {
++
++ private FixedBatchSpout spout;
++
++ public StormTopology getTopology(Config config) {
++
++ this.spout = new FixedBatchSpout(new Fields("sentence"), 20,
++ new Values("one two"),
++ new Values("two three"),
++ new Values("three four"),
++ new Values("four five"),
++ new Values("five six")
++ );
++
++
++ TridentTopology trident = new TridentTopology();
++
++ trident.newStream("wordcount", spout).name("sentence").parallelismHint(1).shuffle()
++ .each(new Fields("sentence"), new Split(), new Fields("word"))
++ .parallelismHint(1)
++ .groupBy(new Fields("word"))
++ .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
++ .parallelismHint(1);
++ return trident.build();
++ }
++
++ public static class Split extends BaseFunction {
++ @Override
++ public void execute(TridentTuple tuple, TridentCollector collector) {
++ String sentence = tuple.getString(0);
++ for (String word : sentence.split(" ")) {
++ collector.emit(new Values(word));
++ }
++ }
++ }
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/bad_hbase.yaml
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/resources/configs/bad_hbase.yaml
index 0000000,0000000..5d91400
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/resources/configs/bad_hbase.yaml
@@@ -1,0 -1,0 +1,98 @@@
++# Licensed to the Apache Software Foundation (ASF) under one
++# or more contributor license agreements. See the NOTICE file
++# distributed with this work for additional information
++# regarding copyright ownership. The ASF licenses this file
++# to you under the Apache License, Version 2.0 (the
++# "License"); you may not use this file except in compliance
++# with the License. You may obtain a copy of the License at
++#
++# http://www.apache.org/licenses/LICENSE-2.0
++#
++# Unless required by applicable law or agreed to in writing, software
++# distributed under the License is distributed on an "AS IS" BASIS,
++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++# See the License for the specific language governing permissions and
++# limitations under the License.
++
++# Test ability to wire together shell spouts/bolts
++---
++
++# topology definition
++# name to be used when submitting
++name: "hbase-wordcount"
++
++# Components
++# Components are analagous to Spring beans. They are meant to be used as constructor,
++# property(setter), and builder arguments.
++#
++# for the time being, components must be declared in the order they are referenced
++
++components:
++ - id: "columnFields"
++ className: "backtype.storm.tuple.Fields"
++ constructorArgs:
++ - ["word"]
++
++ - id: "counterFields"
++ className: "backtype.storm.tuple.Fields"
++ constructorArgs:
++ # !!! the following won't work, and should thow an IllegalArgumentException...
++ - "count"
++
++ - id: "mapper"
++ className: "org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper"
++ configMethods:
++ - name: "withRowKeyField"
++ args: ["word"]
++ - name: "withColumnFields"
++ args: [ref: "columnFields"]
++ - name: "withCounterFields"
++ args: [ref: "counterFields"]
++ - name: "withColumnFamily"
++ args: ["cf"]
++
++# topology configuration
++# this will be passed to the submitter as a map of config options
++#
++config:
++ topology.workers: 1
++ hbase.conf:
++ hbase.rootdir: "hdfs://hadoop:54310/hbase"
++ hbase.zookeeper.quorum: "hadoop"
++
++# spout definitions
++spouts:
++ - id: "word-spout"
++ className: "backtype.storm.testing.TestWordSpout"
++ parallelism: 1
++
++# bolt definitions
++
++bolts:
++ - id: "count-bolt"
++ className: "backtype.storm.testing.TestWordCounter"
++
++ - id: "hbase-bolt"
++ className: "org.apache.storm.hbase.bolt.HBaseBolt"
++ constructorArgs:
++ - "WordCount" # HBase table name
++ - ref: "mapper"
++ configMethods:
++ - name: "withConfigKey"
++ args: ["hbase.conf"]
++ parallelism: 1
++
++
++streams:
++ - name: "" # name isn't used (placeholder for logging, UI, etc.)
++ from: "word-spout"
++ to: "count-bolt"
++ grouping:
++ type: SHUFFLE
++
++ - name: "" # name isn't used (placeholder for logging, UI, etc.)
++ from: "count-bolt"
++ to: "hbase-bolt"
++ grouping:
++ type: FIELDS
++ args: ["word"]
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/config-methods-test.yaml
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/resources/configs/config-methods-test.yaml
index 0000000,0000000..65211ff
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/resources/configs/config-methods-test.yaml
@@@ -1,0 -1,0 +1,70 @@@
++# Licensed to the Apache Software Foundation (ASF) under one
++# or more contributor license agreements. See the NOTICE file
++# distributed with this work for additional information
++# regarding copyright ownership. The ASF licenses this file
++# to you under the Apache License, Version 2.0 (the
++# "License"); you may not use this file except in compliance
++# with the License. You may obtain a copy of the License at
++#
++# http://www.apache.org/licenses/LICENSE-2.0
++#
++# Unless required by applicable law or agreed to in writing, software
++# distributed under the License is distributed on an "AS IS" BASIS,
++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++# See the License for the specific language governing permissions and
++# limitations under the License.
++---
++name: "yaml-topology"
++
++#
++config:
++ topology.workers: 1
++ # ...
++
++# spout definitions
++spouts:
++ - id: "spout-1"
++ className: "backtype.storm.testing.TestWordSpout"
++ parallelism: 1
++ # ...
++
++# bolt definitions
++bolts:
++ - id: "bolt-1"
++ className: "org.apache.storm.flux.test.TestBolt"
++ parallelism: 1
++ constructorArgs:
++ - FOO # enum class
++ - 1.0
++ configMethods:
++ - name: "withFoo"
++ args:
++ - "foo"
++ - name: "withBar"
++ args:
++ - "bar"
++ - name: "withFooBar"
++ args:
++ - "foo"
++ - "bar"
++
++
++
++#stream definitions
++# stream definitions define connections between spouts and bolts.
++# note that such connections can be cyclical
++streams:
++ - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.)
++# id: "connection-1"
++ from: "spout-1"
++ to: "bolt-1"
++ grouping:
++ type: SHUFFLE
++
++
++
++
++
++
++
++
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/existing-topology-method-override.yaml
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/resources/configs/existing-topology-method-override.yaml
index 0000000,0000000..6f3c88a
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/resources/configs/existing-topology-method-override.yaml
@@@ -1,0 -1,0 +1,10 @@@
++---
++
++# configuration that uses an existing topology that does not implement TopologySource
++name: "existing-topology"
++topologySource:
++ className: "org.apache.storm.flux.test.SimpleTopology"
++ methodName: "getTopologyWithDifferentMethodName"
++ constructorArgs:
++ - "foo"
++ - "bar"
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/existing-topology-reflection-config.yaml
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/resources/configs/existing-topology-reflection-config.yaml
index 0000000,0000000..8af8a84
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/resources/configs/existing-topology-reflection-config.yaml
@@@ -1,0 -1,0 +1,9 @@@
++---
++
++# configuration that uses an existing topology that does not implement TopologySource
++name: "existing-topology"
++topologySource:
++ className: "org.apache.storm.flux.test.SimpleTopologyWithConfigParam"
++ constructorArgs:
++ - "foo"
++ - "bar"
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/existing-topology-reflection.yaml
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/resources/configs/existing-topology-reflection.yaml
index 0000000,0000000..dd3e1e8
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/resources/configs/existing-topology-reflection.yaml
@@@ -1,0 -1,0 +1,9 @@@
++---
++
++# configuration that uses an existing topology that does not implement TopologySource
++name: "existing-topology"
++topologySource:
++ className: "org.apache.storm.flux.test.SimpleTopology"
++ constructorArgs:
++ - "foo"
++ - "bar"
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/existing-topology-trident.yaml
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/resources/configs/existing-topology-trident.yaml
index 0000000,0000000..5ac682c
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/resources/configs/existing-topology-trident.yaml
@@@ -1,0 -1,0 +1,9 @@@
++---
++
++name: "existing-topology"
++
++config:
++ topology.workers: 1
++
++topologySource:
++ className: "org.apache.storm.flux.test.TridentTopologySource"
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/existing-topology.yaml
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/resources/configs/existing-topology.yaml
index 0000000,0000000..fa6a0b3
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/resources/configs/existing-topology.yaml
@@@ -1,0 -1,0 +1,8 @@@
++---
++
++name: "existing-topology"
++topologySource:
++ className: "org.apache.storm.flux.test.SimpleTopologySource"
++ constructorArgs:
++ - "foo"
++ - "bar"
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/hdfs_test.yaml
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/resources/configs/hdfs_test.yaml
index 0000000,0000000..8fe0a9a
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/resources/configs/hdfs_test.yaml
@@@ -1,0 -1,0 +1,97 @@@
++# Licensed to the Apache Software Foundation (ASF) under one
++# or more contributor license agreements. See the NOTICE file
++# distributed with this work for additional information
++# regarding copyright ownership. The ASF licenses this file
++# to you under the Apache License, Version 2.0 (the
++# "License"); you may not use this file except in compliance
++# with the License. You may obtain a copy of the License at
++#
++# http://www.apache.org/licenses/LICENSE-2.0
++#
++# Unless required by applicable law or agreed to in writing, software
++# distributed under the License is distributed on an "AS IS" BASIS,
++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++# See the License for the specific language governing permissions and
++# limitations under the License.
++
++# Test ability to wire together shell spouts/bolts
++---
++
++# topology definition
++# name to be used when submitting
++name: "hdfs-topology"
++
++# Components
++# Components are analagous to Spring beans. They are meant to be used as constructor,
++# property(setter), and builder arguments.
++#
++# for the time being, components must be declared in the order they are referenced
++components:
++ - id: "syncPolicy"
++ className: "org.apache.storm.hdfs.bolt.sync.CountSyncPolicy"
++ constructorArgs:
++ - 1000
++ - id: "rotationPolicy"
++ className: "org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy"
++ constructorArgs:
++ - 5.0
++ - MB
++
++ - id: "fileNameFormat"
++ className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat"
++ configMethods:
++ - name: "withPath"
++ args: ["/tmp/foo/"]
++ - name: "withExtension"
++ args: [".txt"]
++
++ - id: "recordFormat"
++ className: "org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat"
++ configMethods:
++ - name: "withFieldDelimiter"
++ args: ["|"]
++
++ - id: "rotationAction"
++ className: "org.apache.storm.hdfs.common.rotation.MoveFileAction"
++ configMethods:
++ - name: "toDestination"
++ args: ["/tmp/dest2"]
++
++# spout definitions
++spouts:
++ - id: "spout-1"
++ className: "backtype.storm.testing.TestWordSpout"
++ parallelism: 1
++ # ...
++
++# bolt definitions
++
++# HdfsBolt bolt = new HdfsBolt()
++# .withConfigKey("hdfs.config")
++# .withFsUrl(args[0])
++# .withFileNameFormat(fileNameFormat)
++# .withRecordFormat(format)
++# .withRotationPolicy(rotationPolicy)
++# .withSyncPolicy(syncPolicy)
++# .addRotationAction(new MoveFileAction().toDestination("/tmp/dest2/"));
++bolts:
++ - id: "bolt-1"
++ className: "org.apache.storm.hdfs.bolt.HdfsBolt"
++ configMethods:
++ - name: "withConfigKey"
++ args: ["hdfs.config"]
++ - name: "withFsUrl"
++ args: ["hdfs://hadoop:54310"]
++ - name: "withFileNameFormat"
++ args: [ref: "fileNameFormat"]
++ - name: "withRecordFormat"
++ args: [ref: "recordFormat"]
++ - name: "withRotationPolicy"
++ args: [ref: "rotationPolicy"]
++ - name: "withSyncPolicy"
++ args: [ref: "syncPolicy"]
++ - name: "addRotationAction"
++ args: [ref: "rotationAction"]
++ parallelism: 1
++ # ...
++
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/include_test.yaml
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/resources/configs/include_test.yaml
index 0000000,0000000..702f590
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/resources/configs/include_test.yaml
@@@ -1,0 -1,0 +1,25 @@@
++# Licensed to the Apache Software Foundation (ASF) under one
++# or more contributor license agreements. See the NOTICE file
++# distributed with this work for additional information
++# regarding copyright ownership. The ASF licenses this file
++# to you under the Apache License, Version 2.0 (the
++# "License"); you may not use this file except in compliance
++# with the License. You may obtain a copy of the License at
++#
++# http://www.apache.org/licenses/LICENSE-2.0
++#
++# Unless required by applicable law or agreed to in writing, software
++# distributed under the License is distributed on an "AS IS" BASIS,
++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++# See the License for the specific language governing permissions and
++# limitations under the License.
++
++# Test includes by defining nothing, and simply override the topology name
++---
++
++name: "include-topology"
++
++includes:
++ - resource: true
++ file: "/configs/shell_test.yaml"
++ override: false #otherwise subsequent includes that define 'name' would override
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/invalid-existing-topology.yaml
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/resources/configs/invalid-existing-topology.yaml
index 0000000,0000000..72128df
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/resources/configs/invalid-existing-topology.yaml
@@@ -1,0 -1,0 +1,17 @@@
++# This is an invalid config. It defines both a topologySource and a list of spouts.
++---
++
++name: "existing-topology"
++topologySource:
++ className: "org.apache.storm.flux.test.SimpleTopologySource"
++
++spouts:
++ - id: "sentence-spout"
++ className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout"
++ # shell spout constructor takes 2 arguments: String[], String[]
++ constructorArgs:
++ # command line
++ - ["node", "randomsentence.js"]
++ # output fields
++ - ["word"]
++ parallelism: 1
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/kafka_test.yaml
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/resources/configs/kafka_test.yaml
index 0000000,0000000..17cd8e2
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/resources/configs/kafka_test.yaml
@@@ -1,0 -1,0 +1,126 @@@
++# Licensed to the Apache Software Foundation (ASF) under one
++# or more contributor license agreements. See the NOTICE file
++# distributed with this work for additional information
++# regarding copyright ownership. The ASF licenses this file
++# to you under the Apache License, Version 2.0 (the
++# "License"); you may not use this file except in compliance
++# with the License. You may obtain a copy of the License at
++#
++# http://www.apache.org/licenses/LICENSE-2.0
++#
++# Unless required by applicable law or agreed to in writing, software
++# distributed under the License is distributed on an "AS IS" BASIS,
++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++# See the License for the specific language governing permissions and
++# limitations under the License.
++---
++
++# topology definition
++# name to be used when submitting
++name: "kafka-topology"
++
++# Components
++# Components are analagous to Spring beans. They are meant to be used as constructor,
++# property(setter), and builder arguments.
++#
++# for the time being, components must be declared in the order they are referenced
++components:
++ - id: "stringScheme"
++ className: "storm.kafka.StringScheme"
++
++ - id: "stringMultiScheme"
++ className: "backtype.storm.spout.SchemeAsMultiScheme"
++ constructorArgs:
++ - ref: "stringScheme"
++
++ - id: "zkHosts"
++ className: "storm.kafka.ZkHosts"
++ constructorArgs:
++ - "localhost:2181"
++
++# Alternative kafka config
++# - id: "kafkaConfig"
++# className: "storm.kafka.KafkaConfig"
++# constructorArgs:
++# # brokerHosts
++# - ref: "zkHosts"
++# # topic
++# - "myKafkaTopic"
++# # clientId (optional)
++# - "myKafkaClientId"
++
++ - id: "spoutConfig"
++ className: "storm.kafka.SpoutConfig"
++ constructorArgs:
++ # brokerHosts
++ - ref: "zkHosts"
++ # topic
++ - "myKafkaTopic"
++ # zkRoot
++ - "/kafkaSpout"
++ # id
++ - "myId"
++ properties:
++ - name: "forceFromStart"
++ value: true
++ - name: "scheme"
++ ref: "stringMultiScheme"
++
++# topology configuration
++# this will be passed to the submitter as a map of config options
++#
++config:
++ topology.workers: 1
++ # ...
++
++# spout definitions
++spouts:
++ - id: "kafka-spout"
++ className: "storm.kafka.KafkaSpout"
++ constructorArgs:
++ - ref: "spoutConfig"
++
++# bolt definitions
++bolts:
++ - id: "splitsentence"
++ className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
++ constructorArgs:
++ # command line
++ - ["python", "splitsentence.py"]
++ # output fields
++ - ["word"]
++ parallelism: 1
++
++ - id: "log"
++ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
++ parallelism: 1
++ # ...
++
++ - id: "count"
++ className: "backtype.storm.testing.TestWordCounter"
++ parallelism: 1
++
++#stream definitions
++# stream definitions define connections between spouts and bolts.
++# note that such connections can be cyclical
++# custom stream groupings are also supported
++
++streams:
++ - name: "kafka --> split" # name isn't used (placeholder for logging, UI, etc.)
++ from: "kafka-spout"
++ to: "splitsentence"
++ grouping:
++ type: SHUFFLE
++
++ - name: "split --> count"
++ from: "splitsentence"
++ to: "count"
++ grouping:
++ type: FIELDS
++ args: ["word"]
++
++ - name: "count --> log"
++ from: "count"
++ to: "log"
++ grouping:
++ type: SHUFFLE
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/shell_test.yaml
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/resources/configs/shell_test.yaml
index 0000000,0000000..b473fa7
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/resources/configs/shell_test.yaml
@@@ -1,0 -1,0 +1,104 @@@
++# Licensed to the Apache Software Foundation (ASF) under one
++# or more contributor license agreements. See the NOTICE file
++# distributed with this work for additional information
++# regarding copyright ownership. The ASF licenses this file
++# to you under the Apache License, Version 2.0 (the
++# "License"); you may not use this file except in compliance
++# with the License. You may obtain a copy of the License at
++#
++# http://www.apache.org/licenses/LICENSE-2.0
++#
++# Unless required by applicable law or agreed to in writing, software
++# distributed under the License is distributed on an "AS IS" BASIS,
++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++# See the License for the specific language governing permissions and
++# limitations under the License.
++
++# Test ability to wire together shell spouts/bolts
++---
++
++# topology definition
++# name to be used when submitting
++name: "shell-topology"
++
++# Components
++# Components are analagous to Spring beans. They are meant to be used as constructor,
++# property(setter), and builder arguments.
++#components:
++# - id: "myComponent"
++# className: "com.foo.bar.MyComponent"
++# constructorArgs:
++# - ...
++# properties:
++# foo: "bar"
++# bar: "foo"
++
++# NOTE: We may want to consider some level of spring integration. For example, allowing component references
++# to a spring `ApplicationContext`.
++
++# topology configuration
++# this will be passed to the submitter as a map of config options
++#
++config:
++ topology.workers: 1
++ # ...
++
++# spout definitions
++spouts:
++ - id: "sentence-spout"
++ className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout"
++ # shell spout constructor takes 2 arguments: String[], String[]
++ constructorArgs:
++ # command line
++ - ["node", "randomsentence.js"]
++ # output fields
++ - ["word"]
++ parallelism: 1
++ # ...
++
++# bolt definitions
++bolts:
++ - id: "splitsentence"
++ className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
++ constructorArgs:
++ # command line
++ - ["python", "splitsentence.py"]
++ # output fields
++ - ["word"]
++ parallelism: 1
++ # ...
++
++ - id: "log"
++ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
++ parallelism: 1
++ # ...
++
++ - id: "count"
++ className: "backtype.storm.testing.TestWordCounter"
++ parallelism: 1
++ # ...
++
++#stream definitions
++# stream definitions define connections between spouts and bolts.
++# note that such connections can be cyclical
++# custom stream groupings are also supported
++
++streams:
++ - name: "spout --> split" # name isn't used (placeholder for logging, UI, etc.)
++ from: "sentence-spout"
++ to: "splitsentence"
++ grouping:
++ type: SHUFFLE
++
++ - name: "split --> count"
++ from: "splitsentence"
++ to: "count"
++ grouping:
++ type: FIELDS
++ args: ["word"]
++
++ - name: "count --> log"
++ from: "count"
++ to: "log"
++ grouping:
++ type: SHUFFLE
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/simple_hbase.yaml
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/resources/configs/simple_hbase.yaml
index 0000000,0000000..e407bd9
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/resources/configs/simple_hbase.yaml
@@@ -1,0 -1,0 +1,120 @@@
++# Licensed to the Apache Software Foundation (ASF) under one
++# or more contributor license agreements. See the NOTICE file
++# distributed with this work for additional information
++# regarding copyright ownership. The ASF licenses this file
++# to you under the Apache License, Version 2.0 (the
++# "License"); you may not use this file except in compliance
++# with the License. You may obtain a copy of the License at
++#
++# http://www.apache.org/licenses/LICENSE-2.0
++#
++# Unless required by applicable law or agreed to in writing, software
++# distributed under the License is distributed on an "AS IS" BASIS,
++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++# See the License for the specific language governing permissions and
++# limitations under the License.
++
++# Test ability to wire together shell spouts/bolts
++---
++
++# topology definition
++# name to be used when submitting
++name: "hbase-wordcount"
++
++# Components
++# Components are analagous to Spring beans. They are meant to be used as constructor,
++# property(setter), and builder arguments.
++#
++# for the time being, components must be declared in the order they are referenced
++
++# WordSpout spout = new WordSpout();
++# WordCounter bolt = new WordCounter();
++#
++# SimpleHBaseMapper mapper = new SimpleHBaseMapper()
++# .withRowKeyField("word")
++# .withColumnFields(new Fields("word"))
++# .withCounterFields(new Fields("count"))
++# .withColumnFamily("cf");
++#
++# HBaseBolt hbase = new HBaseBolt("WordCount", mapper)
++# .withConfigKey("hbase.conf");
++#
++#
++# // wordSpout ==> countBolt ==> HBaseBolt
++# TopologyBuilder builder = new TopologyBuilder();
++#
++# builder.setSpout(WORD_SPOUT, spout, 1);
++# builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
++# builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
++
++
++
++
++components:
++ - id: "columnFields"
++ className: "backtype.storm.tuple.Fields"
++ constructorArgs:
++ - ["word"]
++
++ - id: "counterFields"
++ className: "backtype.storm.tuple.Fields"
++ constructorArgs:
++ - ["count"]
++
++ - id: "mapper"
++ className: "org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper"
++ configMethods:
++ - name: "withRowKeyField"
++ args: ["word"]
++ - name: "withColumnFields"
++ args: [ref: "columnFields"]
++ - name: "withCounterFields"
++ args: [ref: "counterFields"]
++ - name: "withColumnFamily"
++ args: ["cf"]
++
++# topology configuration
++# this will be passed to the submitter as a map of config options
++#
++config:
++ topology.workers: 1
++ hbase.conf:
++ hbase.rootdir: "hdfs://hadoop:54310/hbase"
++ hbase.zookeeper.quorum: "hadoop"
++
++# spout definitions
++spouts:
++ - id: "word-spout"
++ className: "backtype.storm.testing.TestWordSpout"
++ parallelism: 1
++
++# bolt definitions
++
++bolts:
++ - id: "count-bolt"
++ className: "backtype.storm.testing.TestWordCounter"
++
++ - id: "hbase-bolt"
++ className: "org.apache.storm.hbase.bolt.HBaseBolt"
++ constructorArgs:
++ - "WordCount" # HBase table name
++ - ref: "mapper"
++ configMethods:
++ - name: "withConfigKey"
++ args: ["hbase.conf"]
++ parallelism: 1
++
++
++streams:
++ - name: "" # name isn't used (placeholder for logging, UI, etc.)
++ from: "word-spout"
++ to: "count-bolt"
++ grouping:
++ type: SHUFFLE
++
++ - name: "" # name isn't used (placeholder for logging, UI, etc.)
++ from: "count-bolt"
++ to: "hbase-bolt"
++ grouping:
++ type: FIELDS
++ args: ["word"]
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/substitution-test.yaml
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/resources/configs/substitution-test.yaml
index 0000000,0000000..13f1960
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/resources/configs/substitution-test.yaml
@@@ -1,0 -1,0 +1,106 @@@
++# Licensed to the Apache Software Foundation (ASF) under one
++# or more contributor license agreements. See the NOTICE file
++# distributed with this work for additional information
++# regarding copyright ownership. The ASF licenses this file
++# to you under the Apache License, Version 2.0 (the
++# "License"); you may not use this file except in compliance
++# with the License. You may obtain a copy of the License at
++#
++# http://www.apache.org/licenses/LICENSE-2.0
++#
++# Unless required by applicable law or agreed to in writing, software
++# distributed under the License is distributed on an "AS IS" BASIS,
++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++# See the License for the specific language governing permissions and
++# limitations under the License.
++
++# Test ability to wire together shell spouts/bolts
++---
++
++# topology definition
++# name to be used when submitting
++name: "${topology.name}"
++
++# Components
++# Components are analagous to Spring beans. They are meant to be used as constructor,
++# property(setter), and builder arguments.
++#components:
++# - id: "myComponent"
++# className: "com.foo.bar.MyComponent"
++# constructorArgs:
++# - ...
++# properties:
++# foo: "bar"
++# bar: "foo"
++
++# NOTE: We may want to consider some level of spring integration. For example, allowing component references
++# to a spring `ApplicationContext`.
++
++# topology configuration
++# this will be passed to the submitter as a map of config options
++#
++config:
++ topology.workers: 1
++ # test environent variable substitution
++ test.env.value: ${ENV-PATH}
++ # ...
++
++# spout definitions
++spouts:
++ - id: "sentence-spout"
++ className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout"
++ # shell spout constructor takes 2 arguments: String[], String[]
++ constructorArgs:
++ # command line
++ - ["node", "randomsentence.js"]
++ # output fields
++ - ["word"]
++ parallelism: 1
++ # ...
++
++# bolt definitions
++bolts:
++ - id: "splitsentence"
++ className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
++ constructorArgs:
++ # command line
++ - ["python", "splitsentence.py"]
++ # output fields
++ - ["word"]
++ parallelism: 1
++ # ...
++
++ - id: "log"
++ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
++ parallelism: 1
++ # ...
++
++ - id: "count"
++ className: "backtype.storm.testing.TestWordCounter"
++ parallelism: 1
++ # ...
++
++#stream definitions
++# stream definitions define connections between spouts and bolts.
++# note that such connections can be cyclical
++# custom stream groupings are also supported
++
++streams:
++ - name: "spout --> split" # name isn't used (placeholder for logging, UI, etc.)
++ from: "sentence-spout"
++ to: "splitsentence"
++ grouping:
++ type: SHUFFLE
++
++ - name: "split --> count"
++ from: "splitsentence"
++ to: "count"
++ grouping:
++ type: FIELDS
++ args: ["word"]
++
++ - name: "count --> log"
++ from: "count"
++ to: "log"
++ grouping:
++ type: SHUFFLE
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/tck.yaml
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/resources/configs/tck.yaml
index 0000000,0000000..7e9b614
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/resources/configs/tck.yaml
@@@ -1,0 -1,0 +1,95 @@@
++# Licensed to the Apache Software Foundation (ASF) under one
++# or more contributor license agreements. See the NOTICE file
++# distributed with this work for additional information
++# regarding copyright ownership. The ASF licenses this file
++# to you under the Apache License, Version 2.0 (the
++# "License"); you may not use this file except in compliance
++# with the License. You may obtain a copy of the License at
++#
++# http://www.apache.org/licenses/LICENSE-2.0
++#
++# Unless required by applicable law or agreed to in writing, software
++# distributed under the License is distributed on an "AS IS" BASIS,
++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++# See the License for the specific language governing permissions and
++# limitations under the License.
++
++
++# YAML configuration to serve as a basic smoke test for what is supported.
++#
++# We should support comments, so if we've failed so far, things aren't good.
++
++# we shouldn't choke if we see a document separator...
++---
++
++# topology definition
++# name to be used when submitting
++name: "yaml-topology"
++
++# Components
++# Components are analagous to Spring beans. They are meant to be used as constructor,
++# property(setter), and builder arguments.
++#components:
++# - id: "myComponent"
++# className: "com.foo.bar.MyComponent"
++# properties:
++# foo: "bar"
++# bar: "foo"
++
++# NOTE: We may want to consider some level of spring integration. For example, allowing component references
++# to a spring `ApplicationContext`.
++
++# topology configuration
++# this will be passed to the submitter as a map of config options
++#
++config:
++ topology.workers: 1
++ # ...
++
++# spout definitions
++spouts:
++ - id: "spout-1"
++ className: "backtype.storm.testing.TestWordSpout"
++ parallelism: 1
++ # ...
++
++# bolt definitions
++bolts:
++ - id: "bolt-1"
++ className: "backtype.storm.testing.TestWordCounter"
++ parallelism: 1
++ # ...
++
++ - id: "bolt-2"
++ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
++ parallelism: 1
++ # ...
++
++#stream definitions
++# stream definitions define connections between spouts and bolts.
++# note that such connections can be cyclical
++streams:
++ - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.)
++# id: "connection-1"
++ from: "spout-1"
++ to: "bolt-1"
++ grouping:
++ type: FIELDS
++ args: ["word"]
++
++ - name: "bolt-1 --> bolt2"
++ from: "bolt-1"
++ to: "bolt-2"
++ grouping:
++ type: CUSTOM
++ customClass:
++ className: "backtype.storm.testing.NGrouping"
++ constructorArgs:
++ - 1
++
++
++
++
++
++
++
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/configs/test.properties
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/resources/configs/test.properties
index 0000000,0000000..0730d5f
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/resources/configs/test.properties
@@@ -1,0 -1,0 +1,2 @@@
++topology.name: substitution-topology
++some.other.property: foo bar
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-core/src/test/resources/logback.xml
----------------------------------------------------------------------
diff --cc external/flux/flux-core/src/test/resources/logback.xml
index 0000000,0000000..1853b8a
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-core/src/test/resources/logback.xml
@@@ -1,0 -1,0 +1,30 @@@
++<?xml version="1.0"?>
++<!--
++ Licensed to the Apache Software Foundation (ASF) under one or more
++ contributor license agreements. See the NOTICE file distributed with
++ this work for additional information regarding copyright ownership.
++ The ASF licenses this file to You under the Apache License, Version 2.0
++ (the "License"); you may not use this file except in compliance with
++ the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++ Unless required by applicable law or agreed to in writing, software
++ distributed under the License is distributed on an "AS IS" BASIS,
++ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ See the License for the specific language governing permissions and
++ limitations under the License.
++-->
++<configuration scan="true" scanPeriod="30 seconds">
++ <appender name="A1" class="ch.qos.logback.core.ConsoleAppender">
++ <encoder>
++ <pattern>%-4r [%t] %-5p %c - %m%n</pattern>
++ </encoder>
++ </appender>
++ <logger name="org.apache.storm.zookeeper" level="WARN"/>
++ <logger name="org.apache.storm.curator" level="WARN"/>
++ <logger name="org.apache.storm.flux" level="DEBUG"/>
++ <root level="DEBUG">
++ <appender-ref ref="A1"/>
++ </root>
++</configuration>
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-examples/README.md
----------------------------------------------------------------------
diff --cc external/flux/flux-examples/README.md
index 0000000,0000000..b3798a6
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-examples/README.md
@@@ -1,0 -1,0 +1,68 @@@
++# Flux Examples
++A collection of examples illustrating various capabilities.
++
++## Building From Source and Running
++
++Checkout the projects source and perform a top level Maven build (i.e. from the `flux` directory):
++
++```bash
++git clone https://github.com/ptgoetz/flux.git
++cd flux
++mvn install
++```
++
++This will create a shaded (i.e. "fat" or "uber") jar in the `flux-examples/target` directory that can run/deployed with
++the `storm` command:
++
++```bash
++cd flux-examples
++storm jar ./target/flux-examples-0.2.3-SNAPSHOT.jar org.apache.storm.flux.Flux --local ./src/main/resources/simple_wordcount.yaml
++```
++
++The example YAML files are also packaged in the examples jar, so they can also be referenced with Flux's `--resource`
++command line switch:
++
++```bash
++storm jar ./target/flux-examples-0.2.3-SNAPSHOT.jar org.apache.storm.flux.Flux --local --resource /simple_wordcount.yaml
++```
++
++## Available Examples
++
++### [simple_wordcount.yaml](src/main/resources/simple_wordcount.yaml)
++
++This is a very basic wordcount example using Java spouts and bolts. It simply logs the running count of each word
++received.
++
++### [multilang.yaml](src/main/resources/multilang.yaml)
++
++Another wordcount example that uses a spout written in JavaScript (node.js), a bolt written in Python, and two bolts
++written in java.
++
++### [kafka_spout.yaml](src/main/resources/kafka_spout.yaml)
++This example illustrates how to configure Storm's `storm-kafka` spout using Flux YAML DSL `components`, `references`,
++and `constructor arguments` constructs.
++
++### [simple_hdfs.yaml](src/main/resources/simple_hdfs.yaml)
++
++This example demonstrates using Flux to setup a storm-hdfs bolt to write to an HDFS cluster. It also demonstrates Flux's
++variable substitution/filtering feature.
++
++To run the `simple_hdfs.yaml` example, copy the `hdfs_bolt.properties` file to a convenient location and change, at
++least, the property `hdfs.url` to point to a HDFS cluster. Then you can run the example something like:
++
++```bash
++storm jar ./target/flux-examples-0.2.3-SNAPSHOT.jar org.apache.storm.flux.Flux --local ./src/main/resources/simple_hdfs.yaml --filter my_hdfs_bolt.properties
++```
++
++### [simple_hbase.yaml](src/main/resources/simple_hbase.yaml)
++
++This example illustrates how to use Flux to setup a storm-hbase bolt to write to HBase.
++
++In order to use this example, you will need to edit the `src/main resrouces/hbase-site.xml` file to reflect your HBase
++environment, and then rebuild the topology jar.
++
++You can do so by running the following Maven command in the `flux-examples` directory:
++
++```bash
++mvn clean install
++```
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-examples/pom.xml
----------------------------------------------------------------------
diff --cc external/flux/flux-examples/pom.xml
index 0000000,0000000..0b9796e
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-examples/pom.xml
@@@ -1,0 -1,0 +1,87 @@@
++<?xml version="1.0" encoding="UTF-8"?>
++<!--
++ Licensed to the Apache Software Foundation (ASF) under one or more
++ contributor license agreements. See the NOTICE file distributed with
++ this work for additional information regarding copyright ownership.
++ The ASF licenses this file to You under the Apache License, Version 2.0
++ (the "License"); you may not use this file except in compliance with
++ the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++ Unless required by applicable law or agreed to in writing, software
++ distributed under the License is distributed on an "AS IS" BASIS,
++ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ See the License for the specific language governing permissions and
++ limitations under the License.
++-->
++<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
++ <modelVersion>4.0.0</modelVersion>
++
++ <parent>
++ <groupId>com.github.ptgoetz</groupId>
++ <artifactId>flux</artifactId>
++ <version>0.3.1-SNAPSHOT</version>
++ <relativePath>../pom.xml</relativePath>
++ </parent>
++
++ <groupId>com.github.ptgoetz</groupId>
++ <artifactId>flux-examples</artifactId>
++ <packaging>jar</packaging>
++
++ <name>flux-examples</name>
++ <url>https://github.com/ptgoetz/flux</url>
++
++ <dependencies>
++ <dependency>
++ <groupId>com.github.ptgoetz</groupId>
++ <artifactId>flux-core</artifactId>
++ <version>${project.version}</version>
++ </dependency>
++ <dependency>
++ <groupId>com.github.ptgoetz</groupId>
++ <artifactId>flux-wrappers</artifactId>
++ <version>${project.version}</version>
++ </dependency>
++
++ <dependency>
++ <groupId>org.apache.storm</groupId>
++ <artifactId>storm-hdfs</artifactId>
++ <version>${storm.version}</version>
++ </dependency>
++ <dependency>
++ <groupId>org.apache.storm</groupId>
++ <artifactId>storm-hbase</artifactId>
++ <version>${storm.version}</version>
++ </dependency>
++ </dependencies>
++
++ <build>
++ <plugins>
++ <plugin>
++ <groupId>org.apache.maven.plugins</groupId>
++ <artifactId>maven-shade-plugin</artifactId>
++ <version>1.4</version>
++ <configuration>
++ <createDependencyReducedPom>true</createDependencyReducedPom>
++ </configuration>
++ <executions>
++ <execution>
++ <phase>package</phase>
++ <goals>
++ <goal>shade</goal>
++ </goals>
++ <configuration>
++ <transformers>
++ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
++ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
++ <mainClass>org.apache.storm.flux.Flux</mainClass>
++ </transformer>
++ </transformers>
++ </configuration>
++ </execution>
++ </executions>
++ </plugin>
++ </plugins>
++ </build>
++</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java
----------------------------------------------------------------------
diff --cc external/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java
index 0000000,0000000..eb4fb7a
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java
@@@ -1,0 -1,0 +1,74 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.examples;
++
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.hbase.HBaseConfiguration;
++import org.apache.hadoop.hbase.client.Get;
++import org.apache.hadoop.hbase.client.HTable;
++import org.apache.hadoop.hbase.client.Result;
++import org.apache.hadoop.hbase.util.Bytes;
++
++import java.io.FileInputStream;
++import java.util.Properties;
++
++/**
++ * Connects to the 'WordCount' HBase table and prints counts for each word.
++ *
++ * Assumes you have run (or are running) the YAML topology definition in
++ * <code>simple_hbase.yaml</code>
++ *
++ * You will also need to modify `src/main/resources/hbase-site.xml`
++ * to point to your HBase instance, and then repackage with `mvn package`.
++ * This is a known issue.
++ *
++ */
++public class WordCountClient {
++
++ public static void main(String[] args) throws Exception {
++ Configuration config = HBaseConfiguration.create();
++ if(args.length == 1){
++ Properties props = new Properties();
++ props.load(new FileInputStream(args[0]));
++ System.out.println("HBase configuration:");
++ for(Object key : props.keySet()) {
++ System.out.println(key + "=" + props.get(key));
++ config.set((String)key, props.getProperty((String)key));
++ }
++ } else {
++ System.out.println("Usage: WordCountClient <hbase_config.properties>");
++ System.exit(1);
++ }
++
++ HTable table = new HTable(config, "WordCount");
++ String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
++
++ for (String word : words) {
++ Get get = new Get(Bytes.toBytes(word));
++ Result result = table.get(get);
++
++ byte[] countBytes = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("count"));
++ byte[] wordBytes = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("word"));
++
++ String wordStr = Bytes.toString(wordBytes);
++ long count = Bytes.toLong(countBytes);
++ System.out.println("Word: '" + wordStr + "', Count: " + count);
++ }
++
++ }
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCounter.java
----------------------------------------------------------------------
diff --cc external/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCounter.java
index 0000000,0000000..f7c80c7
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCounter.java
@@@ -1,0 -1,0 +1,71 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.examples;
++
++import backtype.storm.task.TopologyContext;
++import backtype.storm.topology.BasicOutputCollector;
++import backtype.storm.topology.IBasicBolt;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.topology.base.BaseBasicBolt;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Tuple;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import java.util.Map;
++
++import static backtype.storm.utils.Utils.tuple;
++
++/**
++ * This bolt is used by the HBase example. It simply emits the first field
++ * found in the incoming tuple as "word", with a "count" of `1`.
++ *
++ * In this case, the downstream HBase bolt handles the counting, so a value
++ * of `1` will just increment the HBase counter by one.
++ */
++public class WordCounter extends BaseBasicBolt {
++ private static final Logger LOG = LoggerFactory.getLogger(WordCounter.class);
++
++
++
++ @SuppressWarnings("rawtypes")
++ public void prepare(Map stormConf, TopologyContext context) {
++ }
++
++ /*
++ * Just output the word value with a count of 1.
++ * The HBaseBolt will handle incrementing the counter.
++ */
++ public void execute(Tuple input, BasicOutputCollector collector) {
++ collector.emit(tuple(input.getValues().get(0), 1));
++ }
++
++ public void cleanup() {
++
++ }
++
++ public void declareOutputFields(OutputFieldsDeclarer declarer) {
++ declarer.declare(new Fields("word", "count"));
++ }
++
++ @Override
++ public Map<String, Object> getComponentConfiguration() {
++ return null;
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-examples/src/main/resources/hbase_bolt.properties
----------------------------------------------------------------------
diff --cc external/flux/flux-examples/src/main/resources/hbase_bolt.properties
index 0000000,0000000..f8ed50c
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-examples/src/main/resources/hbase_bolt.properties
@@@ -1,0 -1,0 +1,18 @@@
++# Licensed to the Apache Software Foundation (ASF) under one
++# or more contributor license agreements. See the NOTICE file
++# distributed with this work for additional information
++# regarding copyright ownership. The ASF licenses this file
++# to you under the Apache License, Version 2.0 (the
++# "License"); you may not use this file except in compliance
++# with the License. You may obtain a copy of the License at
++#
++# http://www.apache.org/licenses/LICENSE-2.0
++#
++# Unless required by applicable law or agreed to in writing, software
++# distributed under the License is distributed on an "AS IS" BASIS,
++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++# See the License for the specific language governing permissions and
++# limitations under the License.
++
++hbase.rootdir=hdfs://hadoop:54310/hbase
++hbase.zookeeper.quorum=hadoop
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-examples/src/main/resources/hdfs_bolt.properties
----------------------------------------------------------------------
diff --cc external/flux/flux-examples/src/main/resources/hdfs_bolt.properties
index 0000000,0000000..7bcbe7a
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-examples/src/main/resources/hdfs_bolt.properties
@@@ -1,0 -1,0 +1,26 @@@
++# Licensed to the Apache Software Foundation (ASF) under one
++# or more contributor license agreements. See the NOTICE file
++# distributed with this work for additional information
++# regarding copyright ownership. The ASF licenses this file
++# to you under the Apache License, Version 2.0 (the
++# "License"); you may not use this file except in compliance
++# with the License. You may obtain a copy of the License at
++#
++# http://www.apache.org/licenses/LICENSE-2.0
++#
++# Unless required by applicable law or agreed to in writing, software
++# distributed under the License is distributed on an "AS IS" BASIS,
++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++# See the License for the specific language governing permissions and
++# limitations under the License.
++
++
++# The HDFS url
++hdfs.url=hdfs://hadoop:54310
++
++# The HDFS directory where the bolt will write incoming data
++hdfs.write.dir=/incoming
++
++# The HDFS directory where files will be moved once the bolt has
++# finished writing to it.
++hdfs.dest.dir=/complete
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-examples/src/main/resources/kafka_spout.yaml
----------------------------------------------------------------------
diff --cc external/flux/flux-examples/src/main/resources/kafka_spout.yaml
index 0000000,0000000..8ffddc5
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-examples/src/main/resources/kafka_spout.yaml
@@@ -1,0 -1,0 +1,136 @@@
++# Licensed to the Apache Software Foundation (ASF) under one
++# or more contributor license agreements. See the NOTICE file
++# distributed with this work for additional information
++# regarding copyright ownership. The ASF licenses this file
++# to you under the Apache License, Version 2.0 (the
++# "License"); you may not use this file except in compliance
++# with the License. You may obtain a copy of the License at
++#
++# http://www.apache.org/licenses/LICENSE-2.0
++#
++# Unless required by applicable law or agreed to in writing, software
++# distributed under the License is distributed on an "AS IS" BASIS,
++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++# See the License for the specific language governing permissions and
++# limitations under the License.
++
++
++# Test ability to wire together shell spouts/bolts
++---
++
++# topology definition
++# name to be used when submitting
++name: "kafka-topology"
++
++# Components
++# Components are analagous to Spring beans. They are meant to be used as constructor,
++# property(setter), and builder arguments.
++#
++# for the time being, components must be declared in the order they are referenced
++components:
++ - id: "stringScheme"
++ className: "storm.kafka.StringScheme"
++
++ - id: "stringMultiScheme"
++ className: "backtype.storm.spout.SchemeAsMultiScheme"
++ constructorArgs:
++ - ref: "stringScheme"
++
++ - id: "zkHosts"
++ className: "storm.kafka.ZkHosts"
++ constructorArgs:
++ - "localhost:2181"
++
++# Alternative kafka config
++# - id: "kafkaConfig"
++# className: "storm.kafka.KafkaConfig"
++# constructorArgs:
++# # brokerHosts
++# - ref: "zkHosts"
++# # topic
++# - "myKafkaTopic"
++# # clientId (optional)
++# - "myKafkaClientId"
++
++ - id: "spoutConfig"
++ className: "storm.kafka.SpoutConfig"
++ constructorArgs:
++ # brokerHosts
++ - ref: "zkHosts"
++ # topic
++ - "myKafkaTopic"
++ # zkRoot
++ - "/kafkaSpout"
++ # id
++ - "myId"
++ properties:
++ - name: "forceFromStart"
++ value: true
++ - name: "scheme"
++ ref: "stringMultiScheme"
++
++
++
++# NOTE: We may want to consider some level of spring integration. For example, allowing component references
++# to a spring `ApplicationContext`.
++
++# topology configuration
++# this will be passed to the submitter as a map of config options
++#
++config:
++ topology.workers: 1
++ # ...
++
++# spout definitions
++spouts:
++ - id: "kafka-spout"
++ className: "storm.kafka.KafkaSpout"
++ constructorArgs:
++ - ref: "spoutConfig"
++
++# bolt definitions
++bolts:
++ - id: "splitsentence"
++ className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
++ constructorArgs:
++ # command line
++ - ["python", "splitsentence.py"]
++ # output fields
++ - ["word"]
++ parallelism: 1
++ # ...
++
++ - id: "log"
++ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
++ parallelism: 1
++ # ...
++
++ - id: "count"
++ className: "backtype.storm.testing.TestWordCounter"
++ parallelism: 1
++ # ...
++
++#stream definitions
++# stream definitions define connections between spouts and bolts.
++# note that such connections can be cyclical
++# custom stream groupings are also supported
++
++streams:
++ - name: "kafka --> split" # name isn't used (placeholder for logging, UI, etc.)
++ from: "kafka-spout"
++ to: "splitsentence"
++ grouping:
++ type: SHUFFLE
++
++ - name: "split --> count"
++ from: "splitsentence"
++ to: "count"
++ grouping:
++ type: FIELDS
++ args: ["word"]
++
++ - name: "count --> log"
++ from: "count"
++ to: "log"
++ grouping:
++ type: SHUFFLE
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-examples/src/main/resources/multilang.yaml
----------------------------------------------------------------------
diff --cc external/flux/flux-examples/src/main/resources/multilang.yaml
index 0000000,0000000..4f80667
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-examples/src/main/resources/multilang.yaml
@@@ -1,0 -1,0 +1,89 @@@
++# Licensed to the Apache Software Foundation (ASF) under one
++# or more contributor license agreements. See the NOTICE file
++# distributed with this work for additional information
++# regarding copyright ownership. The ASF licenses this file
++# to you under the Apache License, Version 2.0 (the
++# "License"); you may not use this file except in compliance
++# with the License. You may obtain a copy of the License at
++#
++# http://www.apache.org/licenses/LICENSE-2.0
++#
++# Unless required by applicable law or agreed to in writing, software
++# distributed under the License is distributed on an "AS IS" BASIS,
++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++# See the License for the specific language governing permissions and
++# limitations under the License.
++
++# Test ability to wire together shell spouts/bolts
++---
++
++# topology definition
++# name to be used when submitting
++name: "shell-topology"
++
++# topology configuration
++# this will be passed to the submitter as a map of config options
++#
++config:
++ topology.workers: 1
++ # ...
++
++# spout definitions
++spouts:
++ - id: "sentence-spout"
++ className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout"
++ # shell spout constructor takes 2 arguments: String[], String[]
++ constructorArgs:
++ # command line
++ - ["node", "randomsentence.js"]
++ # output fields
++ - ["word"]
++ parallelism: 1
++ # ...
++
++# bolt definitions
++bolts:
++ - id: "splitsentence"
++ className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
++ constructorArgs:
++ # command line
++ - ["python", "splitsentence.py"]
++ # output fields
++ - ["word"]
++ parallelism: 1
++ # ...
++
++ - id: "log"
++ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
++ parallelism: 1
++ # ...
++
++ - id: "count"
++ className: "backtype.storm.testing.TestWordCounter"
++ parallelism: 1
++ # ...
++
++#stream definitions
++# stream definitions define connections between spouts and bolts.
++# note that such connections can be cyclical
++# custom stream groupings are also supported
++
++streams:
++ - name: "spout --> split" # name isn't used (placeholder for logging, UI, etc.)
++ from: "sentence-spout"
++ to: "splitsentence"
++ grouping:
++ type: SHUFFLE
++
++ - name: "split --> count"
++ from: "splitsentence"
++ to: "count"
++ grouping:
++ type: FIELDS
++ args: ["word"]
++
++ - name: "count --> log"
++ from: "count"
++ to: "log"
++ grouping:
++ type: SHUFFLE
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-examples/src/main/resources/simple_hbase.yaml
----------------------------------------------------------------------
diff --cc external/flux/flux-examples/src/main/resources/simple_hbase.yaml
index 0000000,0000000..62686d0
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-examples/src/main/resources/simple_hbase.yaml
@@@ -1,0 -1,0 +1,92 @@@
++# Licensed to the Apache Software Foundation (ASF) under one
++# or more contributor license agreements. See the NOTICE file
++# distributed with this work for additional information
++# regarding copyright ownership. The ASF licenses this file
++# to you under the Apache License, Version 2.0 (the
++# "License"); you may not use this file except in compliance
++# with the License. You may obtain a copy of the License at
++#
++# http://www.apache.org/licenses/LICENSE-2.0
++#
++# Unless required by applicable law or agreed to in writing, software
++# distributed under the License is distributed on an "AS IS" BASIS,
++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++# See the License for the specific language governing permissions and
++# limitations under the License.
++---
++# NOTE: To use this example, you will need to modify `src/main/resources/hbase-site.xml`
++# to point to your HBase instance, and then repackage with `mvn package`.
++# This is a known issue.
++
++# topology definition
++# name to be used when submitting
++name: "hbase-persistent-wordcount"
++
++# Components
++components:
++ - id: "columnFields"
++ className: "backtype.storm.tuple.Fields"
++ constructorArgs:
++ - ["word"]
++
++ - id: "counterFields"
++ className: "backtype.storm.tuple.Fields"
++ constructorArgs:
++ - ["count"]
++
++ - id: "mapper"
++ className: "org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper"
++ configMethods:
++ - name: "withRowKeyField"
++ args: ["word"]
++ - name: "withColumnFields"
++ args: [ref: "columnFields"]
++ - name: "withCounterFields"
++ args: [ref: "counterFields"]
++ - name: "withColumnFamily"
++ args: ["cf"]
++
++# topology configuration
++# this will be passed to the submitter as a map of config options
++config:
++ topology.workers: 1
++ hbase.conf:
++ hbase.rootdir: "${hbase.rootdir}"
++ hbase.zookeeper.quorum: "${hbase.zookeeper.quorum}"
++
++# spout definitions
++spouts:
++ - id: "word-spout"
++ className: "backtype.storm.testing.TestWordSpout"
++ parallelism: 1
++
++# bolt definitions
++
++bolts:
++ - id: "count-bolt"
++ className: "org.apache.storm.flux.examples.WordCounter"
++ parallelism: 1
++
++ - id: "hbase-bolt"
++ className: "org.apache.storm.hbase.bolt.HBaseBolt"
++ constructorArgs:
++ - "WordCount" # HBase table name
++ - ref: "mapper"
++ configMethods:
++ - name: "withConfigKey"
++ args: ["hbase.conf"]
++ parallelism: 1
++
++streams:
++ - name: "" # name isn't used (placeholder for logging, UI, etc.)
++ from: "word-spout"
++ to: "count-bolt"
++ grouping:
++ type: SHUFFLE
++
++ - name: "" # name isn't used (placeholder for logging, UI, etc.)
++ from: "count-bolt"
++ to: "hbase-bolt"
++ grouping:
++ type: FIELDS
++ args: ["word"]