You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/06/03 19:35:45 UTC

[31/50] [abbrv] storm git commit: merge flux into external/flux/

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-examples/src/main/resources/simple_hdfs.yaml
----------------------------------------------------------------------
diff --cc external/flux/flux-examples/src/main/resources/simple_hdfs.yaml
index 0000000,0000000..9007869
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-examples/src/main/resources/simple_hdfs.yaml
@@@ -1,0 -1,0 +1,105 @@@
++# Licensed to the Apache Software Foundation (ASF) under one
++# or more contributor license agreements.  See the NOTICE file
++# distributed with this work for additional information
++# regarding copyright ownership.  The ASF licenses this file
++# to you under the Apache License, Version 2.0 (the
++# "License"); you may not use this file except in compliance
++# with the License.  You may obtain a copy of the License at
++#
++# http://www.apache.org/licenses/LICENSE-2.0
++#
++# Unless required by applicable law or agreed to in writing, software
++# distributed under the License is distributed on an "AS IS" BASIS,
++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++# See the License for the specific language governing permissions and
++# limitations under the License.
++
++# Test ability to wire together shell spouts/bolts
++---
++
++# topology definition
++# name to be used when submitting
++name: "hdfs-topology"
++
++# Components
++# Components are analagous to Spring beans. They are meant to be used as constructor,
++# property(setter), and builder arguments.
++#
++# for the time being, components must be declared in the order they are referenced
++components:
++  - id: "syncPolicy"
++    className: "org.apache.storm.hdfs.bolt.sync.CountSyncPolicy"
++    constructorArgs:
++      - 1000
++  - id: "rotationPolicy"
++    className: "org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy"
++    constructorArgs:
++      - 30
++      - SECONDS
++
++  - id: "fileNameFormat"
++    className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat"
++    configMethods:
++      - name: "withPath"
++        args: ["${hdfs.write.dir}"]
++      - name: "withExtension"
++        args: [".txt"]
++
++  - id: "recordFormat"
++    className: "org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat"
++    configMethods:
++      - name: "withFieldDelimiter"
++        args: ["|"]
++
++  - id: "rotationAction"
++    className: "org.apache.storm.hdfs.common.rotation.MoveFileAction"
++    configMethods:
++      - name: "toDestination"
++        args: ["${hdfs.dest.dir}"]
++
++# spout definitions
++spouts:
++  - id: "spout-1"
++    className: "backtype.storm.testing.TestWordSpout"
++    parallelism: 1
++    # ...
++
++# bolt definitions
++
++bolts:
++  - id: "bolt-1"
++    className: "org.apache.storm.hdfs.bolt.HdfsBolt"
++    configMethods:
++      - name: "withConfigKey"
++        args: ["hdfs.config"]
++      - name: "withFsUrl"
++        args: ["${hdfs.url}"]
++      - name: "withFileNameFormat"
++        args: [ref: "fileNameFormat"]
++      - name: "withRecordFormat"
++        args: [ref: "recordFormat"]
++      - name: "withRotationPolicy"
++        args: [ref: "rotationPolicy"]
++      - name: "withSyncPolicy"
++        args: [ref: "syncPolicy"]
++      - name: "addRotationAction"
++        args: [ref: "rotationAction"]
++    parallelism: 1
++    # ...
++
++  - id: "bolt-2"
++    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
++    parallelism: 1
++
++streams:
++  - name: "" # name isn't used (placeholder for logging, UI, etc.)
++    from: "spout-1"
++    to: "bolt-1"
++    grouping:
++      type: SHUFFLE
++
++  - name: "" # name isn't used (placeholder for logging, UI, etc.)
++    from: "spout-1"
++    to: "bolt-2"
++    grouping:
++      type: SHUFFLE

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-examples/src/main/resources/simple_wordcount.yaml
----------------------------------------------------------------------
diff --cc external/flux/flux-examples/src/main/resources/simple_wordcount.yaml
index 0000000,0000000..380f9d2
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-examples/src/main/resources/simple_wordcount.yaml
@@@ -1,0 -1,0 +1,68 @@@
++# Licensed to the Apache Software Foundation (ASF) under one
++# or more contributor license agreements.  See the NOTICE file
++# distributed with this work for additional information
++# regarding copyright ownership.  The ASF licenses this file
++# to you under the Apache License, Version 2.0 (the
++# "License"); you may not use this file except in compliance
++# with the License.  You may obtain a copy of the License at
++#
++# http://www.apache.org/licenses/LICENSE-2.0
++#
++# Unless required by applicable law or agreed to in writing, software
++# distributed under the License is distributed on an "AS IS" BASIS,
++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++# See the License for the specific language governing permissions and
++# limitations under the License.
++
++---
++
++# topology definition
++# name to be used when submitting
++name: "yaml-topology"
++
++# topology configuration
++# this will be passed to the submitter as a map of config options
++#
++config:
++  topology.workers: 1
++
++# spout definitions
++spouts:
++  - id: "spout-1"
++    className: "backtype.storm.testing.TestWordSpout"
++    parallelism: 1
++
++# bolt definitions
++bolts:
++  - id: "bolt-1"
++    className: "backtype.storm.testing.TestWordCounter"
++    parallelism: 1
++
++  - id: "bolt-2"
++    className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
++    parallelism: 1
++
++#stream definitions
++# stream definitions define connections between spouts and bolts.
++# note that such connections can be cyclical
++streams:
++  - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.)
++#    id: "connection-1"
++    from: "spout-1"
++    to: "bolt-1"
++    grouping:
++      type: FIELDS
++      args: ["word"]
++
++  - name: "bolt-1 --> bolt2"
++    from: "bolt-1"
++    to: "bolt-2"
++    grouping:
++      type: SHUFFLE
++
++
++
++
++
++
++

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-ui/README.md
----------------------------------------------------------------------
diff --cc external/flux/flux-ui/README.md
index 0000000,0000000..8b6bd5f
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-ui/README.md
@@@ -1,0 -1,0 +1,3 @@@
++# Flux-UI
++
++Placeholder for Flux GUI

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-wrappers/pom.xml
----------------------------------------------------------------------
diff --cc external/flux/flux-wrappers/pom.xml
index 0000000,0000000..6784141
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-wrappers/pom.xml
@@@ -1,0 -1,0 +1,35 @@@
++<?xml version="1.0" encoding="UTF-8"?>
++<!--
++ Licensed to the Apache Software Foundation (ASF) under one or more
++ contributor license agreements.  See the NOTICE file distributed with
++ this work for additional information regarding copyright ownership.
++ The ASF licenses this file to You under the Apache License, Version 2.0
++ (the "License"); you may not use this file except in compliance with
++ the License.  You may obtain a copy of the License at
++
++     http://www.apache.org/licenses/LICENSE-2.0
++
++ Unless required by applicable law or agreed to in writing, software
++ distributed under the License is distributed on an "AS IS" BASIS,
++ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ See the License for the specific language governing permissions and
++ limitations under the License.
++-->
++<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
++    <modelVersion>4.0.0</modelVersion>
++
++    <parent>
++        <groupId>com.github.ptgoetz</groupId>
++        <artifactId>flux</artifactId>
++        <version>0.3.1-SNAPSHOT</version>
++        <relativePath>../pom.xml</relativePath>
++    </parent>
++
++    <groupId>com.github.ptgoetz</groupId>
++    <artifactId>flux-wrappers</artifactId>
++    <packaging>jar</packaging>
++
++    <name>flux-wrappers</name>
++    <url>https://github.com/ptgoetz/flux</url>
++
++</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java
----------------------------------------------------------------------
diff --cc external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java
index 0000000,0000000..4e0f91c
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java
@@@ -1,0 -1,0 +1,56 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.wrappers.bolts;
++
++import backtype.storm.task.ShellBolt;
++import backtype.storm.topology.IRichBolt;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.tuple.Fields;
++
++import java.util.Map;
++
++/**
++ * A generic `ShellBolt` implementation that allows you specify output fields
++ * without having to subclass `ShellBolt` to do so.
++ *
++ */
++public class FluxShellBolt extends ShellBolt implements IRichBolt{
++    private String[] outputFields;
++    private Map<String, Object> componentConfig;
++
++    /**
++     * Create a ShellBolt with command line arguments and output fields
++     * @param command Command line arguments for the bolt
++     * @param outputFields Names of fields the bolt will emit (if any).
++     */
++
++    public FluxShellBolt(String[] command, String[] outputFields){
++        super(command);
++        this.outputFields = outputFields;
++    }
++
++    @Override
++    public void declareOutputFields(OutputFieldsDeclarer declarer) {
++        declarer.declare(new Fields(this.outputFields));
++    }
++
++    @Override
++    public Map<String, Object> getComponentConfiguration() {
++        return this.componentConfig;
++    }
++}

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/LogInfoBolt.java
----------------------------------------------------------------------
diff --cc external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/LogInfoBolt.java
index 0000000,0000000..a42d7c3
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/LogInfoBolt.java
@@@ -1,0 -1,0 +1,44 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.storm.flux.wrappers.bolts;
++
++import backtype.storm.topology.BasicOutputCollector;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.topology.base.BaseBasicBolt;
++import backtype.storm.tuple.Tuple;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++/**
++ * Simple bolt that does nothing other than LOG.info() every tuple recieveed.
++ *
++ */
++public class LogInfoBolt extends BaseBasicBolt {
++    private static final Logger LOG = LoggerFactory.getLogger(LogInfoBolt.class);
++
++    @Override
++    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
++       LOG.info("{}", tuple);
++    }
++
++    @Override
++    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
++
++    }
++}

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java
----------------------------------------------------------------------
diff --cc external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java
index 0000000,0000000..c7e9058
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java
@@@ -1,0 -1,0 +1,55 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.wrappers.spouts;
++
++import backtype.storm.spout.ShellSpout;
++import backtype.storm.topology.IRichSpout;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.tuple.Fields;
++
++import java.util.Map;
++
++/**
++ * A generic `ShellSpout` implementation that allows you specify output fields
++ * without having to subclass `ShellSpout` to do so.
++ *
++ */
++public class FluxShellSpout extends ShellSpout implements IRichSpout {
++    private String[] outputFields;
++    private Map<String, Object> componentConfig;
++
++    /**
++     * Create a ShellSpout with command line arguments and output fields
++     * @param args Command line arguments for the spout
++     * @param outputFields Names of fields the spout will emit.
++     */
++    public FluxShellSpout(String[] args, String[] outputFields){
++        super(args);
++        this.outputFields = outputFields;
++    }
++
++    @Override
++    public void declareOutputFields(OutputFieldsDeclarer declarer) {
++        declarer.declare(new Fields(this.outputFields));
++    }
++
++    @Override
++    public Map<String, Object> getComponentConfiguration() {
++        return this.componentConfig;
++    }
++}

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-wrappers/src/main/resources/resources/randomsentence.js
----------------------------------------------------------------------
diff --cc external/flux/flux-wrappers/src/main/resources/resources/randomsentence.js
index 0000000,0000000..36fc5f5
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-wrappers/src/main/resources/resources/randomsentence.js
@@@ -1,0 -1,0 +1,93 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++/**
++ * Example for storm spout. Emits random sentences.
++ * The original class in java - storm.starter.spout.RandomSentenceSpout.
++ *
++ */
++
++var storm = require('./storm');
++var Spout = storm.Spout;
++
++
++var SENTENCES = [
++    "the cow jumped over the moon",
++    "an apple a day keeps the doctor away",
++    "four score and seven years ago",
++    "snow white and the seven dwarfs",
++    "i am at two with nature"]
++
++function RandomSentenceSpout(sentences) {
++    Spout.call(this);
++    this.runningTupleId = 0;
++    this.sentences = sentences;
++    this.pending = {};
++};
++
++RandomSentenceSpout.prototype = Object.create(Spout.prototype);
++RandomSentenceSpout.prototype.constructor = RandomSentenceSpout;
++
++RandomSentenceSpout.prototype.getRandomSentence = function() {
++    return this.sentences[getRandomInt(0, this.sentences.length - 1)];
++}
++
++RandomSentenceSpout.prototype.nextTuple = function(done) {
++    var self = this;
++    var sentence = this.getRandomSentence();
++    var tup = [sentence];
++    var id = this.createNextTupleId();
++    this.pending[id] = tup;
++    //This timeout can be removed if TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS is configured to 100
++    setTimeout(function() {
++        self.emit({tuple: tup, id: id}, function(taskIds) {
++            self.log(tup + ' sent to task ids - ' + taskIds);
++        });
++        done();
++    },100);
++}
++
++RandomSentenceSpout.prototype.createNextTupleId = function() {
++    var id = this.runningTupleId;
++    this.runningTupleId++;
++    return id;
++}
++
++RandomSentenceSpout.prototype.ack = function(id, done) {
++    this.log('Received ack for - ' + id);
++    delete this.pending[id];
++    done();
++}
++
++RandomSentenceSpout.prototype.fail = function(id, done) {
++    var self = this;
++    this.log('Received fail for - ' + id + '. Retrying.');
++    this.emit({tuple: this.pending[id], id:id}, function(taskIds) {
++        self.log(self.pending[id] + ' sent to task ids - ' + taskIds);
++    });
++    done();
++}
++
++/**
++ * Returns a random integer between min (inclusive) and max (inclusive)
++ */
++function getRandomInt(min, max) {
++    return Math.floor(Math.random() * (max - min + 1)) + min;
++}
++
++new RandomSentenceSpout(SENTENCES).run();

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-wrappers/src/main/resources/resources/splitsentence.py
----------------------------------------------------------------------
diff --cc external/flux/flux-wrappers/src/main/resources/resources/splitsentence.py
index 0000000,0000000..300105f
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-wrappers/src/main/resources/resources/splitsentence.py
@@@ -1,0 -1,0 +1,24 @@@
++# Licensed to the Apache Software Foundation (ASF) under one
++# or more contributor license agreements.  See the NOTICE file
++# distributed with this work for additional information
++# regarding copyright ownership.  The ASF licenses this file
++# to you under the Apache License, Version 2.0 (the
++# "License"); you may not use this file except in compliance
++# with the License.  You may obtain a copy of the License at
++#
++# http://www.apache.org/licenses/LICENSE-2.0
++#
++# Unless required by applicable law or agreed to in writing, software
++# distributed under the License is distributed on an "AS IS" BASIS,
++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++# See the License for the specific language governing permissions and
++# limitations under the License.
++import storm
++
++class SplitSentenceBolt(storm.BasicBolt):
++    def process(self, tup):
++        words = tup.values[0].split(" ")
++        for word in words:
++          storm.emit([word])
++
++SplitSentenceBolt().run()

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-wrappers/src/main/resources/resources/storm.js
----------------------------------------------------------------------
diff --cc external/flux/flux-wrappers/src/main/resources/resources/storm.js
index 0000000,0000000..355c2d2
new file mode 100755
--- /dev/null
+++ b/external/flux/flux-wrappers/src/main/resources/resources/storm.js
@@@ -1,0 -1,0 +1,373 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++/**
++ * Base classes in node-js for storm Bolt and Spout.
++ * Implements the storm multilang protocol for nodejs.
++ */
++
++
++var fs = require('fs');
++
++function Storm() {
++    this.messagePart = "";
++    this.taskIdsCallbacks = [];
++    this.isFirstMessage = true;
++    this.separator = '\nend\n';
++}
++
++Storm.prototype.sendMsgToParent = function(msg) {
++    var str = JSON.stringify(msg);
++    process.stdout.write(str + this.separator);
++}
++
++Storm.prototype.sync = function() {
++    this.sendMsgToParent({"command":"sync"});
++}
++
++Storm.prototype.sendPid = function(heartbeatdir) {
++    var pid = process.pid;
++    fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w"));
++    this.sendMsgToParent({"pid": pid})
++}
++
++Storm.prototype.log = function(msg) {
++    this.sendMsgToParent({"command": "log", "msg": msg});
++}
++
++Storm.prototype.initSetupInfo = function(setupInfo) {
++    var self = this;
++    var callback = function() {
++        self.sendPid(setupInfo['pidDir']);
++    }
++    this.initialize(setupInfo['conf'], setupInfo['context'], callback);
++}
++
++Storm.prototype.startReadingInput = function() {
++    var self = this;
++    process.stdin.on('readable', function() {
++        var chunk = process.stdin.read();
++        var messages = self.handleNewChunk(chunk);
++        messages.forEach(function(message) {
++            self.handleNewMessage(message);
++        })
++
++    });
++}
++
++/**
++ * receives a new string chunk and returns a list of new messages with the separator removed
++ * stores state in this.messagePart
++ * @param chunk
++ */
++Storm.prototype.handleNewChunk = function(chunk) {
++    //invariant: this.messagePart has no separator otherwise we would have parsed it already
++    var messages = [];
++    if (chunk && chunk.length !== 0) {
++        //"{}".split("\nend\n")           ==> ['{}']
++        //"\nend\n".split("\nend\n")      ==> [''  , '']
++        //"{}\nend\n".split("\nend\n")    ==> ['{}', '']
++        //"\nend\n{}".split("\nend\n")    ==> [''  , '{}']
++        // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ]
++        this.messagePart = this.messagePart + chunk;
++        var newMessageParts = this.messagePart.split(this.separator);
++        while (newMessageParts.length > 0) {
++            var potentialMessage = newMessageParts.shift();
++            var anotherMessageAhead = newMessageParts.length > 0;
++            if  (!anotherMessageAhead) {
++                this.messagePart = potentialMessage;
++            }
++            else if (potentialMessage.length > 0) {
++                messages.push(potentialMessage);
++            }
++        }
++    }
++    return messages;
++}
++
++Storm.prototype.isTaskIds = function(msg) {
++    return (msg instanceof Array);
++}
++
++Storm.prototype.handleNewMessage = function(msg) {
++    var parsedMsg = JSON.parse(msg);
++
++    if (this.isFirstMessage) {
++        this.initSetupInfo(parsedMsg);
++        this.isFirstMessage = false;
++    } else if (this.isTaskIds(parsedMsg)) {
++        this.handleNewTaskId(parsedMsg);
++    } else {
++        this.handleNewCommand(parsedMsg);
++    }
++}
++
++Storm.prototype.handleNewTaskId = function(taskIds) {
++    //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called.
++    //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply
++    //take the first callback in the list and be sure it is the right one.
++
++    var callback = this.taskIdsCallbacks.shift();
++    if (callback) {
++        callback(taskIds);
++    } else {
++        throw new Error('Something went wrong, we off the split of task id callbacks');
++    }
++}
++
++
++
++/**
++ *
++ * @param messageDetails json with the emit details.
++ *
++ * For bolt, the json must contain the required fields:
++ * - tuple - the value to emit
++ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
++ * tuple and return ack when all components successfully finished to process it.
++ * and may contain the optional fields:
++ * - stream (if empty - emit to default stream)
++ *
++ * For spout, the json must contain the required fields:
++ * - tuple - the value to emit
++ *
++ * and may contain the optional fields:
++ * - id - pass id for reliable emit (and receive ack/fail later).
++ * - stream - if empty - emit to default stream.
++ *
++ * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
++ */
++Storm.prototype.emit = function(messageDetails, onTaskIds) {
++    //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible
++    //through the callback (will be called when the response arrives). The callback is stored in a list until the
++    //corresponding task id list arrives.
++    if (messageDetails.task) {
++        throw new Error('Illegal input - task. To emit to specific task use emit direct!');
++    }
++
++    if (!onTaskIds) {
++        throw new Error('You must pass a onTaskIds callback when using emit!')
++    }
++
++    this.taskIdsCallbacks.push(onTaskIds);
++    this.__emit(messageDetails);;
++}
++
++
++/**
++ * Emit message to specific task.
++ * @param messageDetails json with the emit details.
++ *
++ * For bolt, the json must contain the required fields:
++ * - tuple - the value to emit
++ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
++ * tuple and return ack when all components successfully finished to process it.
++ * - task - indicate the task to send the tuple to.
++ * and may contain the optional fields:
++ * - stream (if empty - emit to default stream)
++ *
++ * For spout, the json must contain the required fields:
++ * - tuple - the value to emit
++ * - task - indicate the task to send the tuple to.
++ * and may contain the optional fields:
++ * - id - pass id for reliable emit (and receive ack/fail later).
++ * - stream - if empty - emit to default stream.
++ *
++ * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
++ */
++Storm.prototype.emitDirect = function(commandDetails) {
++    if (!commandDetails.task) {
++        throw new Error("Emit direct must receive task id!")
++    }
++    this.__emit(commandDetails);
++}
++
++/**
++ * Initialize storm component according to the configuration received.
++ * @param conf configuration object accrding to storm protocol.
++ * @param context context object according to storm protocol.
++ * @param done callback. Call this method when finished initializing.
++ */
++Storm.prototype.initialize = function(conf, context, done) {
++    done();
++}
++
++Storm.prototype.run = function() {
++    process.stdout.setEncoding('utf8');
++    process.stdin.setEncoding('utf8');
++    this.startReadingInput();
++}
++
++function Tuple(id, component, stream, task, values) {
++    this.id = id;
++    this.component = component;
++    this.stream = stream;
++    this.task = task;
++    this.values = values;
++}
++
++/**
++ * Base class for storm bolt.
++ * To create a bolt implement 'process' method.
++ * You may also implement initialize method to
++ */
++function BasicBolt() {
++    Storm.call(this);
++    this.anchorTuple = null;
++};
++
++BasicBolt.prototype = Object.create(Storm.prototype);
++BasicBolt.prototype.constructor = BasicBolt;
++
++/**
++ * Emit message.
++ * @param commandDetails json with the required fields:
++ * - tuple - the value to emit
++ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
++ * tuple and return ack when all components successfully finished to process it.
++ * and the optional fields:
++ * - stream (if empty - emit to default stream)
++ * - task (pass only to emit to specific task)
++ */
++BasicBolt.prototype.__emit = function(commandDetails) {
++    var self = this;
++
++    var message = {
++        command: "emit",
++        tuple: commandDetails.tuple,
++        stream: commandDetails.stream,
++        task: commandDetails.task,
++        anchors: [commandDetails.anchorTupleId]
++    };
++
++    this.sendMsgToParent(message);
++}
++
++BasicBolt.prototype.handleNewCommand = function(command) {
++    var self = this;
++    var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
++
++    if (tup.task === -1 && tup.stream === "__heartbeat") {
++        self.sync();
++        return;
++    }
++
++    var callback = function(err) {
++        if (err) {
++            self.fail(tup, err);
++            return;
++        }
++        self.ack(tup);
++    }
++    this.process(tup, callback);
++}
++
++/**
++ * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what
++ * should it do?).
++ * @param tuple the input of the bolt - what to process.
++ * @param done call this method when done processing.
++ */
++BasicBolt.prototype.process = function(tuple, done) {};
++
++BasicBolt.prototype.ack = function(tup) {
++    this.sendMsgToParent({"command": "ack", "id": tup.id});
++}
++
++BasicBolt.prototype.fail = function(tup, err) {
++    this.sendMsgToParent({"command": "fail", "id": tup.id});
++}
++
++
++/**
++ * Base class for storm spout.
++ * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail
++ * can stay empty).
++ * You may also implement initialize method.
++ *
++ */
++function Spout() {
++    Storm.call(this);
++};
++
++Spout.prototype = Object.create(Storm.prototype);
++
++Spout.prototype.constructor = Spout;
++
++/**
++ * This method will be called when an ack is received for preciously sent tuple. One may implement it.
++ * @param id The id of the tuple.
++ * @param done Call this method when finished and ready to receive more tuples.
++ */
++Spout.prototype.ack = function(id, done) {};
++
++/**
++ * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example -
++ * log the failure or send the tuple again).
++ * @param id The id of the tuple.
++ * @param done Call this method when finished and ready to receive more tuples.
++ */
++Spout.prototype.fail = function(id, done) {};
++
++/**
++ * Method the indicates its time to emit the next tuple.
++ * @param done call this method when done sending the output.
++ */
++Spout.prototype.nextTuple = function(done) {};
++
++Spout.prototype.handleNewCommand = function(command) {
++    var self = this;
++    var callback = function() {
++        self.sync();
++    }
++
++    if (command["command"] === "next") {
++        this.nextTuple(callback);
++    }
++
++    if (command["command"] === "ack") {
++        this.ack(command["id"], callback);
++    }
++
++    if (command["command"] === "fail") {
++        this.fail(command["id"], callback);
++    }
++}
++
++/**
++ * @param commandDetails json with the required fields:
++ * - tuple - the value to emit.
++ * and the optional fields:
++ * - id - pass id for reliable emit (and receive ack/fail later).
++ * - stream - if empty - emit to default stream.
++ * - task - pass only to emit to specific task.
++ */
++Spout.prototype.__emit = function(commandDetails) {
++    var message = {
++        command: "emit",
++        tuple: commandDetails.tuple,
++        id: commandDetails.id,
++        stream: commandDetails.stream,
++        task: commandDetails.task
++    };
++
++    this.sendMsgToParent(message);
++}
++
++module.exports.BasicBolt = BasicBolt;
++module.exports.Spout = Spout;

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-wrappers/src/main/resources/resources/storm.py
----------------------------------------------------------------------
diff --cc external/flux/flux-wrappers/src/main/resources/resources/storm.py
index 0000000,0000000..642c393
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-wrappers/src/main/resources/resources/storm.py
@@@ -1,0 -1,0 +1,260 @@@
++# -*- coding: utf-8 -*-
++
++# Licensed to the Apache Software Foundation (ASF) under one
++# or more contributor license agreements.  See the NOTICE file
++# distributed with this work for additional information
++# regarding copyright ownership.  The ASF licenses this file
++# to you under the Apache License, Version 2.0 (the
++# "License"); you may not use this file except in compliance
++# with the License.  You may obtain a copy of the License at
++#
++# http://www.apache.org/licenses/LICENSE-2.0
++#
++# Unless required by applicable law or agreed to in writing, software
++# distributed under the License is distributed on an "AS IS" BASIS,
++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++# See the License for the specific language governing permissions and
++# limitations under the License.
++
++import sys
++import os
++import traceback
++from collections import deque
++
++try:
++    import simplejson as json
++except ImportError:
++    import json
++
++json_encode = lambda x: json.dumps(x)
++json_decode = lambda x: json.loads(x)
++
++#reads lines and reconstructs newlines appropriately
++def readMsg():
++    msg = ""
++    while True:
++        line = sys.stdin.readline()
++        if not line:
++            raise Exception('Read EOF from stdin')
++        if line[0:-1] == "end":
++            break
++        msg = msg + line
++    return json_decode(msg[0:-1])
++
++MODE = None
++ANCHOR_TUPLE = None
++
++#queue up commands we read while trying to read taskids
++pending_commands = deque()
++
++def readTaskIds():
++    if pending_taskids:
++        return pending_taskids.popleft()
++    else:
++        msg = readMsg()
++        while type(msg) is not list:
++            pending_commands.append(msg)
++            msg = readMsg()
++        return msg
++
++#queue up taskids we read while trying to read commands/tuples
++pending_taskids = deque()
++
++def readCommand():
++    if pending_commands:
++        return pending_commands.popleft()
++    else:
++        msg = readMsg()
++        while type(msg) is list:
++            pending_taskids.append(msg)
++            msg = readMsg()
++        return msg
++
++def readTuple():
++    cmd = readCommand()
++    return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
++
++def sendMsgToParent(msg):
++    print json_encode(msg)
++    print "end"
++    sys.stdout.flush()
++
++def sync():
++    sendMsgToParent({'command':'sync'})
++
++def sendpid(heartbeatdir):
++    pid = os.getpid()
++    sendMsgToParent({'pid':pid})
++    open(heartbeatdir + "/" + str(pid), "w").close()
++
++def emit(*args, **kwargs):
++    __emit(*args, **kwargs)
++    return readTaskIds()
++
++def emitDirect(task, *args, **kwargs):
++    kwargs["directTask"] = task
++    __emit(*args, **kwargs)
++
++def __emit(*args, **kwargs):
++    global MODE
++    if MODE == Bolt:
++        emitBolt(*args, **kwargs)
++    elif MODE == Spout:
++        emitSpout(*args, **kwargs)
++
++def emitBolt(tup, stream=None, anchors = [], directTask=None):
++    global ANCHOR_TUPLE
++    if ANCHOR_TUPLE is not None:
++        anchors = [ANCHOR_TUPLE]
++    m = {"command": "emit"}
++    if stream is not None:
++        m["stream"] = stream
++    m["anchors"] = map(lambda a: a.id, anchors)
++    if directTask is not None:
++        m["task"] = directTask
++    m["tuple"] = tup
++    sendMsgToParent(m)
++
++def emitSpout(tup, stream=None, id=None, directTask=None):
++    m = {"command": "emit"}
++    if id is not None:
++        m["id"] = id
++    if stream is not None:
++        m["stream"] = stream
++    if directTask is not None:
++        m["task"] = directTask
++    m["tuple"] = tup
++    sendMsgToParent(m)
++
++def ack(tup):
++    sendMsgToParent({"command": "ack", "id": tup.id})
++
++def fail(tup):
++    sendMsgToParent({"command": "fail", "id": tup.id})
++
++def reportError(msg):
++    sendMsgToParent({"command": "error", "msg": msg})
++
++def log(msg, level=2):
++    sendMsgToParent({"command": "log", "msg": msg, "level":level})
++
++def logTrace(msg):
++    log(msg, 0)
++
++def logDebug(msg):
++    log(msg, 1)
++
++def logInfo(msg):
++    log(msg, 2)
++
++def logWarn(msg):
++    log(msg, 3)
++
++def logError(msg):
++    log(msg, 4)
++
++def rpcMetrics(name, params):
++    sendMsgToParent({"command": "metrics", "name": name, "params": params})
++
++def initComponent():
++    setupInfo = readMsg()
++    sendpid(setupInfo['pidDir'])
++    return [setupInfo['conf'], setupInfo['context']]
++
++class Tuple(object):
++    def __init__(self, id, component, stream, task, values):
++        self.id = id
++        self.component = component
++        self.stream = stream
++        self.task = task
++        self.values = values
++
++    def __repr__(self):
++        return '<%s%s>' % (
++            self.__class__.__name__,
++            ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys())))
++
++    def is_heartbeat_tuple(self):
++        return self.task == -1 and self.stream == "__heartbeat"
++
++class Bolt(object):
++    def initialize(self, stormconf, context):
++        pass
++
++    def process(self, tuple):
++        pass
++
++    def run(self):
++        global MODE
++        MODE = Bolt
++        conf, context = initComponent()
++        try:
++            self.initialize(conf, context)
++            while True:
++                tup = readTuple()
++                if tup.is_heartbeat_tuple():
++                    sync()
++                else:
++                    self.process(tup)
++        except Exception, e:
++            reportError(traceback.format_exc(e))
++
++class BasicBolt(object):
++    def initialize(self, stormconf, context):
++        pass
++
++    def process(self, tuple):
++        pass
++
++    def run(self):
++        global MODE
++        MODE = Bolt
++        global ANCHOR_TUPLE
++        conf, context = initComponent()
++        try:
++            self.initialize(conf, context)
++            while True:
++                tup = readTuple()
++                if tup.is_heartbeat_tuple():
++                    sync()
++                else:
++                    ANCHOR_TUPLE = tup
++                    try:
++                        self.process(tup)
++                        ack(tup)
++                    except Exception, e:
++                        reportError(traceback.format_exc(e))
++                        fail(tup)
++        except Exception, e:
++            reportError(traceback.format_exc(e))
++
++class Spout(object):
++    def initialize(self, conf, context):
++        pass
++
++    def ack(self, id):
++        pass
++
++    def fail(self, id):
++        pass
++
++    def nextTuple(self):
++        pass
++
++    def run(self):
++        global MODE
++        MODE = Spout
++        conf, context = initComponent()
++        try:
++            self.initialize(conf, context)
++            while True:
++                msg = readCommand()
++                if msg["command"] == "next":
++                    self.nextTuple()
++                if msg["command"] == "ack":
++                    self.ack(msg["id"])
++                if msg["command"] == "fail":
++                    self.fail(msg["id"])
++                sync()
++        except Exception, e:
++            reportError(traceback.format_exc(e))

http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/pom.xml
----------------------------------------------------------------------
diff --cc external/flux/pom.xml
index 0000000,0000000..5ea1b40
new file mode 100644
--- /dev/null
+++ b/external/flux/pom.xml
@@@ -1,0 -1,0 +1,126 @@@
++<?xml version="1.0" encoding="UTF-8"?>
++<!--
++ Licensed to the Apache Software Foundation (ASF) under one or more
++ contributor license agreements.  See the NOTICE file distributed with
++ this work for additional information regarding copyright ownership.
++ The ASF licenses this file to You under the Apache License, Version 2.0
++ (the "License"); you may not use this file except in compliance with
++ the License.  You may obtain a copy of the License at
++
++     http://www.apache.org/licenses/LICENSE-2.0
++
++ Unless required by applicable law or agreed to in writing, software
++ distributed under the License is distributed on an "AS IS" BASIS,
++ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ See the License for the specific language governing permissions and
++ limitations under the License.
++-->
++<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
++    <modelVersion>4.0.0</modelVersion>
++
++    <groupId>com.github.ptgoetz</groupId>
++    <artifactId>flux</artifactId>
++    <version>0.3.1-SNAPSHOT</version>
++    <packaging>pom</packaging>
++    <name>flux</name>
++    <url>https://github.com/ptgoetz/flux</url>
++
++    <parent>
++        <groupId>org.sonatype.oss</groupId>
++        <artifactId>oss-parent</artifactId>
++        <version>7</version>
++    </parent>
++    <scm>
++        <connection>scm:git:git@github.com:ptgoetz/flux.git</connection>
++        <developerConnection>scm:git:git@github.com:ptgoetz/flux.git</developerConnection>
++        <url>:git@github.com:ptgoetz/flux.git</url>
++    </scm>
++
++    <developers>
++        <developer>
++            <id>ptgoetz</id>
++            <name>P. Taylor Goetz</name>
++            <email>ptgoetz@apache.org</email>
++        </developer>
++    </developers>
++
++    <properties>
++        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
++        <storm.version>0.9.3</storm.version>
++        <!-- see comment below... This fixes an annoyance with intellij -->
++        <provided.scope>provided</provided.scope>
++    </properties>
++
++    <profiles>
++        <!--
++            Hack to make intellij behave.
++            If you use intellij, enable this profile in your IDE.
++            It should make life easier.
++        -->
++        <profile>
++            <id>intellij</id>
++            <properties>
++                <provided.scope>compile</provided.scope>
++            </properties>
++        </profile>
++    </profiles>
++
++    <modules>
++        <module>flux-wrappers</module>
++        <module>flux-core</module>
++        <module>flux-examples</module>
++    </modules>
++
++    <dependencies>
++        <dependency>
++            <groupId>org.apache.storm</groupId>
++            <artifactId>storm-core</artifactId>
++            <version>${storm.version}</version>
++            <scope>${provided.scope}</scope>
++        </dependency>
++        <dependency>
++            <groupId>commons-cli</groupId>
++            <artifactId>commons-cli</artifactId>
++            <version>1.2</version>
++        </dependency>
++        <dependency>
++            <groupId>org.apache.kafka</groupId>
++            <artifactId>kafka_2.10</artifactId>
++            <version>0.8.1.1</version>
++            <scope>test</scope>
++            <exclusions>
++                <exclusion>
++                    <groupId>org.apache.zookeeper</groupId>
++                    <artifactId>zookeeper</artifactId>
++                </exclusion>
++                <exclusion>
++                    <groupId>log4j</groupId>
++                    <artifactId>log4j</artifactId>
++                </exclusion>
++            </exclusions>
++        </dependency>
++        <dependency>
++            <groupId>junit</groupId>
++            <artifactId>junit</artifactId>
++            <version>4.11</version>
++            <scope>test</scope>
++        </dependency>
++    </dependencies>
++    <build>
++        <resources>
++
++        </resources>
++        <plugins>
++            <plugin>
++                <groupId>org.apache.maven.plugins</groupId>
++                <artifactId>maven-compiler-plugin</artifactId>
++                <version>3.3</version>
++                <configuration>
++                    <source>1.6</source>
++                    <target>1.6</target>
++                    <encoding>UTF-8</encoding>
++                </configuration>
++            </plugin>
++        </plugins>
++    </build>
++</project>