You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/06/04 04:06:05 UTC
[23/50] [abbrv] storm git commit: merge flux into external/flux/
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-examples/src/main/resources/simple_hdfs.yaml
----------------------------------------------------------------------
diff --cc external/flux/flux-examples/src/main/resources/simple_hdfs.yaml
index 0000000,0000000..9007869
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-examples/src/main/resources/simple_hdfs.yaml
@@@ -1,0 -1,0 +1,105 @@@
++# Licensed to the Apache Software Foundation (ASF) under one
++# or more contributor license agreements. See the NOTICE file
++# distributed with this work for additional information
++# regarding copyright ownership. The ASF licenses this file
++# to you under the Apache License, Version 2.0 (the
++# "License"); you may not use this file except in compliance
++# with the License. You may obtain a copy of the License at
++#
++# http://www.apache.org/licenses/LICENSE-2.0
++#
++# Unless required by applicable law or agreed to in writing, software
++# distributed under the License is distributed on an "AS IS" BASIS,
++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++# See the License for the specific language governing permissions and
++# limitations under the License.
++
++# Test ability to wire together shell spouts/bolts
++---
++
++# topology definition
++# name to be used when submitting
++name: "hdfs-topology"
++
++# Components
++# Components are analagous to Spring beans. They are meant to be used as constructor,
++# property(setter), and builder arguments.
++#
++# for the time being, components must be declared in the order they are referenced
++components:
++ - id: "syncPolicy"
++ className: "org.apache.storm.hdfs.bolt.sync.CountSyncPolicy"
++ constructorArgs:
++ - 1000
++ - id: "rotationPolicy"
++ className: "org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy"
++ constructorArgs:
++ - 30
++ - SECONDS
++
++ - id: "fileNameFormat"
++ className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat"
++ configMethods:
++ - name: "withPath"
++ args: ["${hdfs.write.dir}"]
++ - name: "withExtension"
++ args: [".txt"]
++
++ - id: "recordFormat"
++ className: "org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat"
++ configMethods:
++ - name: "withFieldDelimiter"
++ args: ["|"]
++
++ - id: "rotationAction"
++ className: "org.apache.storm.hdfs.common.rotation.MoveFileAction"
++ configMethods:
++ - name: "toDestination"
++ args: ["${hdfs.dest.dir}"]
++
++# spout definitions
++spouts:
++ - id: "spout-1"
++ className: "backtype.storm.testing.TestWordSpout"
++ parallelism: 1
++ # ...
++
++# bolt definitions
++
++bolts:
++ - id: "bolt-1"
++ className: "org.apache.storm.hdfs.bolt.HdfsBolt"
++ configMethods:
++ - name: "withConfigKey"
++ args: ["hdfs.config"]
++ - name: "withFsUrl"
++ args: ["${hdfs.url}"]
++ - name: "withFileNameFormat"
++ args: [ref: "fileNameFormat"]
++ - name: "withRecordFormat"
++ args: [ref: "recordFormat"]
++ - name: "withRotationPolicy"
++ args: [ref: "rotationPolicy"]
++ - name: "withSyncPolicy"
++ args: [ref: "syncPolicy"]
++ - name: "addRotationAction"
++ args: [ref: "rotationAction"]
++ parallelism: 1
++ # ...
++
++ - id: "bolt-2"
++ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
++ parallelism: 1
++
++streams:
++ - name: "" # name isn't used (placeholder for logging, UI, etc.)
++ from: "spout-1"
++ to: "bolt-1"
++ grouping:
++ type: SHUFFLE
++
++ - name: "" # name isn't used (placeholder for logging, UI, etc.)
++ from: "spout-1"
++ to: "bolt-2"
++ grouping:
++ type: SHUFFLE
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-examples/src/main/resources/simple_wordcount.yaml
----------------------------------------------------------------------
diff --cc external/flux/flux-examples/src/main/resources/simple_wordcount.yaml
index 0000000,0000000..380f9d2
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-examples/src/main/resources/simple_wordcount.yaml
@@@ -1,0 -1,0 +1,68 @@@
++# Licensed to the Apache Software Foundation (ASF) under one
++# or more contributor license agreements. See the NOTICE file
++# distributed with this work for additional information
++# regarding copyright ownership. The ASF licenses this file
++# to you under the Apache License, Version 2.0 (the
++# "License"); you may not use this file except in compliance
++# with the License. You may obtain a copy of the License at
++#
++# http://www.apache.org/licenses/LICENSE-2.0
++#
++# Unless required by applicable law or agreed to in writing, software
++# distributed under the License is distributed on an "AS IS" BASIS,
++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++# See the License for the specific language governing permissions and
++# limitations under the License.
++
++---
++
++# topology definition
++# name to be used when submitting
++name: "yaml-topology"
++
++# topology configuration
++# this will be passed to the submitter as a map of config options
++#
++config:
++ topology.workers: 1
++
++# spout definitions
++spouts:
++ - id: "spout-1"
++ className: "backtype.storm.testing.TestWordSpout"
++ parallelism: 1
++
++# bolt definitions
++bolts:
++ - id: "bolt-1"
++ className: "backtype.storm.testing.TestWordCounter"
++ parallelism: 1
++
++ - id: "bolt-2"
++ className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
++ parallelism: 1
++
++#stream definitions
++# stream definitions define connections between spouts and bolts.
++# note that such connections can be cyclical
++streams:
++ - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.)
++# id: "connection-1"
++ from: "spout-1"
++ to: "bolt-1"
++ grouping:
++ type: FIELDS
++ args: ["word"]
++
++ - name: "bolt-1 --> bolt2"
++ from: "bolt-1"
++ to: "bolt-2"
++ grouping:
++ type: SHUFFLE
++
++
++
++
++
++
++
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-ui/README.md
----------------------------------------------------------------------
diff --cc external/flux/flux-ui/README.md
index 0000000,0000000..8b6bd5f
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-ui/README.md
@@@ -1,0 -1,0 +1,3 @@@
++# Flux-UI
++
++Placeholder for Flux GUI
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-wrappers/pom.xml
----------------------------------------------------------------------
diff --cc external/flux/flux-wrappers/pom.xml
index 0000000,0000000..6784141
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-wrappers/pom.xml
@@@ -1,0 -1,0 +1,35 @@@
++<?xml version="1.0" encoding="UTF-8"?>
++<!--
++ Licensed to the Apache Software Foundation (ASF) under one or more
++ contributor license agreements. See the NOTICE file distributed with
++ this work for additional information regarding copyright ownership.
++ The ASF licenses this file to You under the Apache License, Version 2.0
++ (the "License"); you may not use this file except in compliance with
++ the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++ Unless required by applicable law or agreed to in writing, software
++ distributed under the License is distributed on an "AS IS" BASIS,
++ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ See the License for the specific language governing permissions and
++ limitations under the License.
++-->
++<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
++ <modelVersion>4.0.0</modelVersion>
++
++ <parent>
++ <groupId>com.github.ptgoetz</groupId>
++ <artifactId>flux</artifactId>
++ <version>0.3.1-SNAPSHOT</version>
++ <relativePath>../pom.xml</relativePath>
++ </parent>
++
++ <groupId>com.github.ptgoetz</groupId>
++ <artifactId>flux-wrappers</artifactId>
++ <packaging>jar</packaging>
++
++ <name>flux-wrappers</name>
++ <url>https://github.com/ptgoetz/flux</url>
++
++</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java
----------------------------------------------------------------------
diff --cc external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java
index 0000000,0000000..4e0f91c
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/FluxShellBolt.java
@@@ -1,0 -1,0 +1,56 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.wrappers.bolts;
++
++import backtype.storm.task.ShellBolt;
++import backtype.storm.topology.IRichBolt;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.tuple.Fields;
++
++import java.util.Map;
++
++/**
++ * A generic `ShellBolt` implementation that allows you specify output fields
++ * without having to subclass `ShellBolt` to do so.
++ *
++ */
++public class FluxShellBolt extends ShellBolt implements IRichBolt{
++ private String[] outputFields;
++ private Map<String, Object> componentConfig;
++
++ /**
++ * Create a ShellBolt with command line arguments and output fields
++ * @param command Command line arguments for the bolt
++ * @param outputFields Names of fields the bolt will emit (if any).
++ */
++
++ public FluxShellBolt(String[] command, String[] outputFields){
++ super(command);
++ this.outputFields = outputFields;
++ }
++
++ @Override
++ public void declareOutputFields(OutputFieldsDeclarer declarer) {
++ declarer.declare(new Fields(this.outputFields));
++ }
++
++ @Override
++ public Map<String, Object> getComponentConfiguration() {
++ return this.componentConfig;
++ }
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/LogInfoBolt.java
----------------------------------------------------------------------
diff --cc external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/LogInfoBolt.java
index 0000000,0000000..a42d7c3
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/bolts/LogInfoBolt.java
@@@ -1,0 -1,0 +1,44 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.storm.flux.wrappers.bolts;
++
++import backtype.storm.topology.BasicOutputCollector;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.topology.base.BaseBasicBolt;
++import backtype.storm.tuple.Tuple;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++/**
++ * Simple bolt that does nothing other than LOG.info() every tuple recieveed.
++ *
++ */
++public class LogInfoBolt extends BaseBasicBolt {
++ private static final Logger LOG = LoggerFactory.getLogger(LogInfoBolt.class);
++
++ @Override
++ public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
++ LOG.info("{}", tuple);
++ }
++
++ @Override
++ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
++
++ }
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java
----------------------------------------------------------------------
diff --cc external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java
index 0000000,0000000..c7e9058
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-wrappers/src/main/java/org/apache/storm/flux/wrappers/spouts/FluxShellSpout.java
@@@ -1,0 -1,0 +1,55 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.storm.flux.wrappers.spouts;
++
++import backtype.storm.spout.ShellSpout;
++import backtype.storm.topology.IRichSpout;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.tuple.Fields;
++
++import java.util.Map;
++
++/**
++ * A generic `ShellSpout` implementation that allows you specify output fields
++ * without having to subclass `ShellSpout` to do so.
++ *
++ */
++public class FluxShellSpout extends ShellSpout implements IRichSpout {
++ private String[] outputFields;
++ private Map<String, Object> componentConfig;
++
++ /**
++ * Create a ShellSpout with command line arguments and output fields
++ * @param args Command line arguments for the spout
++ * @param outputFields Names of fields the spout will emit.
++ */
++ public FluxShellSpout(String[] args, String[] outputFields){
++ super(args);
++ this.outputFields = outputFields;
++ }
++
++ @Override
++ public void declareOutputFields(OutputFieldsDeclarer declarer) {
++ declarer.declare(new Fields(this.outputFields));
++ }
++
++ @Override
++ public Map<String, Object> getComponentConfiguration() {
++ return this.componentConfig;
++ }
++}
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-wrappers/src/main/resources/resources/randomsentence.js
----------------------------------------------------------------------
diff --cc external/flux/flux-wrappers/src/main/resources/resources/randomsentence.js
index 0000000,0000000..36fc5f5
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-wrappers/src/main/resources/resources/randomsentence.js
@@@ -1,0 -1,0 +1,93 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++/**
++ * Example for storm spout. Emits random sentences.
++ * The original class in java - storm.starter.spout.RandomSentenceSpout.
++ *
++ */
++
++var storm = require('./storm');
++var Spout = storm.Spout;
++
++
++var SENTENCES = [
++ "the cow jumped over the moon",
++ "an apple a day keeps the doctor away",
++ "four score and seven years ago",
++ "snow white and the seven dwarfs",
++ "i am at two with nature"]
++
++function RandomSentenceSpout(sentences) {
++ Spout.call(this);
++ this.runningTupleId = 0;
++ this.sentences = sentences;
++ this.pending = {};
++};
++
++RandomSentenceSpout.prototype = Object.create(Spout.prototype);
++RandomSentenceSpout.prototype.constructor = RandomSentenceSpout;
++
++RandomSentenceSpout.prototype.getRandomSentence = function() {
++ return this.sentences[getRandomInt(0, this.sentences.length - 1)];
++}
++
++RandomSentenceSpout.prototype.nextTuple = function(done) {
++ var self = this;
++ var sentence = this.getRandomSentence();
++ var tup = [sentence];
++ var id = this.createNextTupleId();
++ this.pending[id] = tup;
++ //This timeout can be removed if TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS is configured to 100
++ setTimeout(function() {
++ self.emit({tuple: tup, id: id}, function(taskIds) {
++ self.log(tup + ' sent to task ids - ' + taskIds);
++ });
++ done();
++ },100);
++}
++
++RandomSentenceSpout.prototype.createNextTupleId = function() {
++ var id = this.runningTupleId;
++ this.runningTupleId++;
++ return id;
++}
++
++RandomSentenceSpout.prototype.ack = function(id, done) {
++ this.log('Received ack for - ' + id);
++ delete this.pending[id];
++ done();
++}
++
++RandomSentenceSpout.prototype.fail = function(id, done) {
++ var self = this;
++ this.log('Received fail for - ' + id + '. Retrying.');
++ this.emit({tuple: this.pending[id], id:id}, function(taskIds) {
++ self.log(self.pending[id] + ' sent to task ids - ' + taskIds);
++ });
++ done();
++}
++
++/**
++ * Returns a random integer between min (inclusive) and max (inclusive)
++ */
++function getRandomInt(min, max) {
++ return Math.floor(Math.random() * (max - min + 1)) + min;
++}
++
++new RandomSentenceSpout(SENTENCES).run();
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-wrappers/src/main/resources/resources/splitsentence.py
----------------------------------------------------------------------
diff --cc external/flux/flux-wrappers/src/main/resources/resources/splitsentence.py
index 0000000,0000000..300105f
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-wrappers/src/main/resources/resources/splitsentence.py
@@@ -1,0 -1,0 +1,24 @@@
++# Licensed to the Apache Software Foundation (ASF) under one
++# or more contributor license agreements. See the NOTICE file
++# distributed with this work for additional information
++# regarding copyright ownership. The ASF licenses this file
++# to you under the Apache License, Version 2.0 (the
++# "License"); you may not use this file except in compliance
++# with the License. You may obtain a copy of the License at
++#
++# http://www.apache.org/licenses/LICENSE-2.0
++#
++# Unless required by applicable law or agreed to in writing, software
++# distributed under the License is distributed on an "AS IS" BASIS,
++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++# See the License for the specific language governing permissions and
++# limitations under the License.
++import storm
++
++class SplitSentenceBolt(storm.BasicBolt):
++ def process(self, tup):
++ words = tup.values[0].split(" ")
++ for word in words:
++ storm.emit([word])
++
++SplitSentenceBolt().run()
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-wrappers/src/main/resources/resources/storm.js
----------------------------------------------------------------------
diff --cc external/flux/flux-wrappers/src/main/resources/resources/storm.js
index 0000000,0000000..355c2d2
new file mode 100755
--- /dev/null
+++ b/external/flux/flux-wrappers/src/main/resources/resources/storm.js
@@@ -1,0 -1,0 +1,373 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++/**
++ * Base classes in node-js for storm Bolt and Spout.
++ * Implements the storm multilang protocol for nodejs.
++ */
++
++
++var fs = require('fs');
++
++function Storm() {
++ this.messagePart = "";
++ this.taskIdsCallbacks = [];
++ this.isFirstMessage = true;
++ this.separator = '\nend\n';
++}
++
++Storm.prototype.sendMsgToParent = function(msg) {
++ var str = JSON.stringify(msg);
++ process.stdout.write(str + this.separator);
++}
++
++Storm.prototype.sync = function() {
++ this.sendMsgToParent({"command":"sync"});
++}
++
++Storm.prototype.sendPid = function(heartbeatdir) {
++ var pid = process.pid;
++ fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w"));
++ this.sendMsgToParent({"pid": pid})
++}
++
++Storm.prototype.log = function(msg) {
++ this.sendMsgToParent({"command": "log", "msg": msg});
++}
++
++Storm.prototype.initSetupInfo = function(setupInfo) {
++ var self = this;
++ var callback = function() {
++ self.sendPid(setupInfo['pidDir']);
++ }
++ this.initialize(setupInfo['conf'], setupInfo['context'], callback);
++}
++
++Storm.prototype.startReadingInput = function() {
++ var self = this;
++ process.stdin.on('readable', function() {
++ var chunk = process.stdin.read();
++ var messages = self.handleNewChunk(chunk);
++ messages.forEach(function(message) {
++ self.handleNewMessage(message);
++ })
++
++ });
++}
++
++/**
++ * receives a new string chunk and returns a list of new messages with the separator removed
++ * stores state in this.messagePart
++ * @param chunk
++ */
++Storm.prototype.handleNewChunk = function(chunk) {
++ //invariant: this.messagePart has no separator otherwise we would have parsed it already
++ var messages = [];
++ if (chunk && chunk.length !== 0) {
++ //"{}".split("\nend\n") ==> ['{}']
++ //"\nend\n".split("\nend\n") ==> ['' , '']
++ //"{}\nend\n".split("\nend\n") ==> ['{}', '']
++ //"\nend\n{}".split("\nend\n") ==> ['' , '{}']
++ // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ]
++ this.messagePart = this.messagePart + chunk;
++ var newMessageParts = this.messagePart.split(this.separator);
++ while (newMessageParts.length > 0) {
++ var potentialMessage = newMessageParts.shift();
++ var anotherMessageAhead = newMessageParts.length > 0;
++ if (!anotherMessageAhead) {
++ this.messagePart = potentialMessage;
++ }
++ else if (potentialMessage.length > 0) {
++ messages.push(potentialMessage);
++ }
++ }
++ }
++ return messages;
++}
++
++Storm.prototype.isTaskIds = function(msg) {
++ return (msg instanceof Array);
++}
++
++Storm.prototype.handleNewMessage = function(msg) {
++ var parsedMsg = JSON.parse(msg);
++
++ if (this.isFirstMessage) {
++ this.initSetupInfo(parsedMsg);
++ this.isFirstMessage = false;
++ } else if (this.isTaskIds(parsedMsg)) {
++ this.handleNewTaskId(parsedMsg);
++ } else {
++ this.handleNewCommand(parsedMsg);
++ }
++}
++
++Storm.prototype.handleNewTaskId = function(taskIds) {
++ //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called.
++ //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply
++ //take the first callback in the list and be sure it is the right one.
++
++ var callback = this.taskIdsCallbacks.shift();
++ if (callback) {
++ callback(taskIds);
++ } else {
++ throw new Error('Something went wrong, we off the split of task id callbacks');
++ }
++}
++
++
++
++/**
++ *
++ * @param messageDetails json with the emit details.
++ *
++ * For bolt, the json must contain the required fields:
++ * - tuple - the value to emit
++ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
++ * tuple and return ack when all components successfully finished to process it.
++ * and may contain the optional fields:
++ * - stream (if empty - emit to default stream)
++ *
++ * For spout, the json must contain the required fields:
++ * - tuple - the value to emit
++ *
++ * and may contain the optional fields:
++ * - id - pass id for reliable emit (and receive ack/fail later).
++ * - stream - if empty - emit to default stream.
++ *
++ * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
++ */
++Storm.prototype.emit = function(messageDetails, onTaskIds) {
++ //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible
++ //through the callback (will be called when the response arrives). The callback is stored in a list until the
++ //corresponding task id list arrives.
++ if (messageDetails.task) {
++ throw new Error('Illegal input - task. To emit to specific task use emit direct!');
++ }
++
++ if (!onTaskIds) {
++ throw new Error('You must pass a onTaskIds callback when using emit!')
++ }
++
++ this.taskIdsCallbacks.push(onTaskIds);
++ this.__emit(messageDetails);;
++}
++
++
++/**
++ * Emit message to specific task.
++ * @param messageDetails json with the emit details.
++ *
++ * For bolt, the json must contain the required fields:
++ * - tuple - the value to emit
++ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
++ * tuple and return ack when all components successfully finished to process it.
++ * - task - indicate the task to send the tuple to.
++ * and may contain the optional fields:
++ * - stream (if empty - emit to default stream)
++ *
++ * For spout, the json must contain the required fields:
++ * - tuple - the value to emit
++ * - task - indicate the task to send the tuple to.
++ * and may contain the optional fields:
++ * - id - pass id for reliable emit (and receive ack/fail later).
++ * - stream - if empty - emit to default stream.
++ *
++ * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
++ */
++Storm.prototype.emitDirect = function(commandDetails) {
++ if (!commandDetails.task) {
++ throw new Error("Emit direct must receive task id!")
++ }
++ this.__emit(commandDetails);
++}
++
++/**
++ * Initialize storm component according to the configuration received.
++ * @param conf configuration object accrding to storm protocol.
++ * @param context context object according to storm protocol.
++ * @param done callback. Call this method when finished initializing.
++ */
++Storm.prototype.initialize = function(conf, context, done) {
++ done();
++}
++
++Storm.prototype.run = function() {
++ process.stdout.setEncoding('utf8');
++ process.stdin.setEncoding('utf8');
++ this.startReadingInput();
++}
++
++function Tuple(id, component, stream, task, values) {
++ this.id = id;
++ this.component = component;
++ this.stream = stream;
++ this.task = task;
++ this.values = values;
++}
++
++/**
++ * Base class for storm bolt.
++ * To create a bolt implement 'process' method.
++ * You may also implement initialize method to
++ */
++function BasicBolt() {
++ Storm.call(this);
++ this.anchorTuple = null;
++};
++
++BasicBolt.prototype = Object.create(Storm.prototype);
++BasicBolt.prototype.constructor = BasicBolt;
++
++/**
++ * Emit message.
++ * @param commandDetails json with the required fields:
++ * - tuple - the value to emit
++ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
++ * tuple and return ack when all components successfully finished to process it.
++ * and the optional fields:
++ * - stream (if empty - emit to default stream)
++ * - task (pass only to emit to specific task)
++ */
++BasicBolt.prototype.__emit = function(commandDetails) {
++ var self = this;
++
++ var message = {
++ command: "emit",
++ tuple: commandDetails.tuple,
++ stream: commandDetails.stream,
++ task: commandDetails.task,
++ anchors: [commandDetails.anchorTupleId]
++ };
++
++ this.sendMsgToParent(message);
++}
++
++BasicBolt.prototype.handleNewCommand = function(command) {
++ var self = this;
++ var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
++
++ if (tup.task === -1 && tup.stream === "__heartbeat") {
++ self.sync();
++ return;
++ }
++
++ var callback = function(err) {
++ if (err) {
++ self.fail(tup, err);
++ return;
++ }
++ self.ack(tup);
++ }
++ this.process(tup, callback);
++}
++
++/**
++ * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what
++ * should it do?).
++ * @param tuple the input of the bolt - what to process.
++ * @param done call this method when done processing.
++ */
++BasicBolt.prototype.process = function(tuple, done) {};
++
++BasicBolt.prototype.ack = function(tup) {
++ this.sendMsgToParent({"command": "ack", "id": tup.id});
++}
++
++BasicBolt.prototype.fail = function(tup, err) {
++ this.sendMsgToParent({"command": "fail", "id": tup.id});
++}
++
++
++/**
++ * Base class for storm spout.
++ * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail
++ * can stay empty).
++ * You may also implement initialize method.
++ *
++ */
++function Spout() {
++ Storm.call(this);
++};
++
++Spout.prototype = Object.create(Storm.prototype);
++
++Spout.prototype.constructor = Spout;
++
++/**
++ * This method will be called when an ack is received for preciously sent tuple. One may implement it.
++ * @param id The id of the tuple.
++ * @param done Call this method when finished and ready to receive more tuples.
++ */
++Spout.prototype.ack = function(id, done) {};
++
++/**
++ * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example -
++ * log the failure or send the tuple again).
++ * @param id The id of the tuple.
++ * @param done Call this method when finished and ready to receive more tuples.
++ */
++Spout.prototype.fail = function(id, done) {};
++
++/**
++ * Method the indicates its time to emit the next tuple.
++ * @param done call this method when done sending the output.
++ */
++Spout.prototype.nextTuple = function(done) {};
++
++Spout.prototype.handleNewCommand = function(command) {
++ var self = this;
++ var callback = function() {
++ self.sync();
++ }
++
++ if (command["command"] === "next") {
++ this.nextTuple(callback);
++ }
++
++ if (command["command"] === "ack") {
++ this.ack(command["id"], callback);
++ }
++
++ if (command["command"] === "fail") {
++ this.fail(command["id"], callback);
++ }
++}
++
++/**
++ * @param commandDetails json with the required fields:
++ * - tuple - the value to emit.
++ * and the optional fields:
++ * - id - pass id for reliable emit (and receive ack/fail later).
++ * - stream - if empty - emit to default stream.
++ * - task - pass only to emit to specific task.
++ */
++Spout.prototype.__emit = function(commandDetails) {
++ var message = {
++ command: "emit",
++ tuple: commandDetails.tuple,
++ id: commandDetails.id,
++ stream: commandDetails.stream,
++ task: commandDetails.task
++ };
++
++ this.sendMsgToParent(message);
++}
++
++module.exports.BasicBolt = BasicBolt;
++module.exports.Spout = Spout;
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/flux-wrappers/src/main/resources/resources/storm.py
----------------------------------------------------------------------
diff --cc external/flux/flux-wrappers/src/main/resources/resources/storm.py
index 0000000,0000000..642c393
new file mode 100644
--- /dev/null
+++ b/external/flux/flux-wrappers/src/main/resources/resources/storm.py
@@@ -1,0 -1,0 +1,260 @@@
++# -*- coding: utf-8 -*-
++
++# Licensed to the Apache Software Foundation (ASF) under one
++# or more contributor license agreements. See the NOTICE file
++# distributed with this work for additional information
++# regarding copyright ownership. The ASF licenses this file
++# to you under the Apache License, Version 2.0 (the
++# "License"); you may not use this file except in compliance
++# with the License. You may obtain a copy of the License at
++#
++# http://www.apache.org/licenses/LICENSE-2.0
++#
++# Unless required by applicable law or agreed to in writing, software
++# distributed under the License is distributed on an "AS IS" BASIS,
++# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++# See the License for the specific language governing permissions and
++# limitations under the License.
++
++import sys
++import os
++import traceback
++from collections import deque
++
++try:
++ import simplejson as json
++except ImportError:
++ import json
++
++json_encode = lambda x: json.dumps(x)
++json_decode = lambda x: json.loads(x)
++
++#reads lines and reconstructs newlines appropriately
++def readMsg():
++ msg = ""
++ while True:
++ line = sys.stdin.readline()
++ if not line:
++ raise Exception('Read EOF from stdin')
++ if line[0:-1] == "end":
++ break
++ msg = msg + line
++ return json_decode(msg[0:-1])
++
++MODE = None
++ANCHOR_TUPLE = None
++
++#queue up commands we read while trying to read taskids
++pending_commands = deque()
++
++def readTaskIds():
++ if pending_taskids:
++ return pending_taskids.popleft()
++ else:
++ msg = readMsg()
++ while type(msg) is not list:
++ pending_commands.append(msg)
++ msg = readMsg()
++ return msg
++
++#queue up taskids we read while trying to read commands/tuples
++pending_taskids = deque()
++
++def readCommand():
++ if pending_commands:
++ return pending_commands.popleft()
++ else:
++ msg = readMsg()
++ while type(msg) is list:
++ pending_taskids.append(msg)
++ msg = readMsg()
++ return msg
++
++def readTuple():
++ cmd = readCommand()
++ return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
++
++def sendMsgToParent(msg):
++ print json_encode(msg)
++ print "end"
++ sys.stdout.flush()
++
++def sync():
++ sendMsgToParent({'command':'sync'})
++
++def sendpid(heartbeatdir):
++ pid = os.getpid()
++ sendMsgToParent({'pid':pid})
++ open(heartbeatdir + "/" + str(pid), "w").close()
++
++def emit(*args, **kwargs):
++ __emit(*args, **kwargs)
++ return readTaskIds()
++
++def emitDirect(task, *args, **kwargs):
++ kwargs["directTask"] = task
++ __emit(*args, **kwargs)
++
++def __emit(*args, **kwargs):
++ global MODE
++ if MODE == Bolt:
++ emitBolt(*args, **kwargs)
++ elif MODE == Spout:
++ emitSpout(*args, **kwargs)
++
++def emitBolt(tup, stream=None, anchors = [], directTask=None):
++ global ANCHOR_TUPLE
++ if ANCHOR_TUPLE is not None:
++ anchors = [ANCHOR_TUPLE]
++ m = {"command": "emit"}
++ if stream is not None:
++ m["stream"] = stream
++ m["anchors"] = map(lambda a: a.id, anchors)
++ if directTask is not None:
++ m["task"] = directTask
++ m["tuple"] = tup
++ sendMsgToParent(m)
++
++def emitSpout(tup, stream=None, id=None, directTask=None):
++ m = {"command": "emit"}
++ if id is not None:
++ m["id"] = id
++ if stream is not None:
++ m["stream"] = stream
++ if directTask is not None:
++ m["task"] = directTask
++ m["tuple"] = tup
++ sendMsgToParent(m)
++
++def ack(tup):
++ sendMsgToParent({"command": "ack", "id": tup.id})
++
++def fail(tup):
++ sendMsgToParent({"command": "fail", "id": tup.id})
++
++def reportError(msg):
++ sendMsgToParent({"command": "error", "msg": msg})
++
++def log(msg, level=2):
++ sendMsgToParent({"command": "log", "msg": msg, "level":level})
++
++def logTrace(msg):
++ log(msg, 0)
++
++def logDebug(msg):
++ log(msg, 1)
++
++def logInfo(msg):
++ log(msg, 2)
++
++def logWarn(msg):
++ log(msg, 3)
++
++def logError(msg):
++ log(msg, 4)
++
++def rpcMetrics(name, params):
++ sendMsgToParent({"command": "metrics", "name": name, "params": params})
++
++def initComponent():
++ setupInfo = readMsg()
++ sendpid(setupInfo['pidDir'])
++ return [setupInfo['conf'], setupInfo['context']]
++
++class Tuple(object):
++ def __init__(self, id, component, stream, task, values):
++ self.id = id
++ self.component = component
++ self.stream = stream
++ self.task = task
++ self.values = values
++
++ def __repr__(self):
++ return '<%s%s>' % (
++ self.__class__.__name__,
++ ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys())))
++
++ def is_heartbeat_tuple(self):
++ return self.task == -1 and self.stream == "__heartbeat"
++
++class Bolt(object):
++ def initialize(self, stormconf, context):
++ pass
++
++ def process(self, tuple):
++ pass
++
++ def run(self):
++ global MODE
++ MODE = Bolt
++ conf, context = initComponent()
++ try:
++ self.initialize(conf, context)
++ while True:
++ tup = readTuple()
++ if tup.is_heartbeat_tuple():
++ sync()
++ else:
++ self.process(tup)
++ except Exception, e:
++ reportError(traceback.format_exc(e))
++
++class BasicBolt(object):
++ def initialize(self, stormconf, context):
++ pass
++
++ def process(self, tuple):
++ pass
++
++ def run(self):
++ global MODE
++ MODE = Bolt
++ global ANCHOR_TUPLE
++ conf, context = initComponent()
++ try:
++ self.initialize(conf, context)
++ while True:
++ tup = readTuple()
++ if tup.is_heartbeat_tuple():
++ sync()
++ else:
++ ANCHOR_TUPLE = tup
++ try:
++ self.process(tup)
++ ack(tup)
++ except Exception, e:
++ reportError(traceback.format_exc(e))
++ fail(tup)
++ except Exception, e:
++ reportError(traceback.format_exc(e))
++
++class Spout(object):
++ def initialize(self, conf, context):
++ pass
++
++ def ack(self, id):
++ pass
++
++ def fail(self, id):
++ pass
++
++ def nextTuple(self):
++ pass
++
++ def run(self):
++ global MODE
++ MODE = Spout
++ conf, context = initComponent()
++ try:
++ self.initialize(conf, context)
++ while True:
++ msg = readCommand()
++ if msg["command"] == "next":
++ self.nextTuple()
++ if msg["command"] == "ack":
++ self.ack(msg["id"])
++ if msg["command"] == "fail":
++ self.fail(msg["id"])
++ sync()
++ except Exception, e:
++ reportError(traceback.format_exc(e))
http://git-wip-us.apache.org/repos/asf/storm/blob/b21a98dd/external/flux/pom.xml
----------------------------------------------------------------------
diff --cc external/flux/pom.xml
index 0000000,0000000..5ea1b40
new file mode 100644
--- /dev/null
+++ b/external/flux/pom.xml
@@@ -1,0 -1,0 +1,126 @@@
++<?xml version="1.0" encoding="UTF-8"?>
++<!--
++ Licensed to the Apache Software Foundation (ASF) under one or more
++ contributor license agreements. See the NOTICE file distributed with
++ this work for additional information regarding copyright ownership.
++ The ASF licenses this file to You under the Apache License, Version 2.0
++ (the "License"); you may not use this file except in compliance with
++ the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++ Unless required by applicable law or agreed to in writing, software
++ distributed under the License is distributed on an "AS IS" BASIS,
++ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ See the License for the specific language governing permissions and
++ limitations under the License.
++-->
++<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
++ <modelVersion>4.0.0</modelVersion>
++
++ <groupId>com.github.ptgoetz</groupId>
++ <artifactId>flux</artifactId>
++ <version>0.3.1-SNAPSHOT</version>
++ <packaging>pom</packaging>
++ <name>flux</name>
++ <url>https://github.com/ptgoetz/flux</url>
++
++ <parent>
++ <groupId>org.sonatype.oss</groupId>
++ <artifactId>oss-parent</artifactId>
++ <version>7</version>
++ </parent>
++ <scm>
++ <connection>scm:git:git@github.com:ptgoetz/flux.git</connection>
++ <developerConnection>scm:git:git@github.com:ptgoetz/flux.git</developerConnection>
++ <url>:git@github.com:ptgoetz/flux.git</url>
++ </scm>
++
++ <developers>
++ <developer>
++ <id>ptgoetz</id>
++ <name>P. Taylor Goetz</name>
++ <email>ptgoetz@apache.org</email>
++ </developer>
++ </developers>
++
++ <properties>
++ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
++ <storm.version>0.9.3</storm.version>
++ <!-- see comment below... This fixes an annoyance with intellij -->
++ <provided.scope>provided</provided.scope>
++ </properties>
++
++ <profiles>
++ <!--
++ Hack to make intellij behave.
++ If you use intellij, enable this profile in your IDE.
++ It should make life easier.
++ -->
++ <profile>
++ <id>intellij</id>
++ <properties>
++ <provided.scope>compile</provided.scope>
++ </properties>
++ </profile>
++ </profiles>
++
++ <modules>
++ <module>flux-wrappers</module>
++ <module>flux-core</module>
++ <module>flux-examples</module>
++ </modules>
++
++ <dependencies>
++ <dependency>
++ <groupId>org.apache.storm</groupId>
++ <artifactId>storm-core</artifactId>
++ <version>${storm.version}</version>
++ <scope>${provided.scope}</scope>
++ </dependency>
++ <dependency>
++ <groupId>commons-cli</groupId>
++ <artifactId>commons-cli</artifactId>
++ <version>1.2</version>
++ </dependency>
++ <dependency>
++ <groupId>org.apache.kafka</groupId>
++ <artifactId>kafka_2.10</artifactId>
++ <version>0.8.1.1</version>
++ <scope>test</scope>
++ <exclusions>
++ <exclusion>
++ <groupId>org.apache.zookeeper</groupId>
++ <artifactId>zookeeper</artifactId>
++ </exclusion>
++ <exclusion>
++ <groupId>log4j</groupId>
++ <artifactId>log4j</artifactId>
++ </exclusion>
++ </exclusions>
++ </dependency>
++ <dependency>
++ <groupId>junit</groupId>
++ <artifactId>junit</artifactId>
++ <version>4.11</version>
++ <scope>test</scope>
++ </dependency>
++ </dependencies>
++ <build>
++ <resources>
++
++ </resources>
++ <plugins>
++ <plugin>
++ <groupId>org.apache.maven.plugins</groupId>
++ <artifactId>maven-compiler-plugin</artifactId>
++ <version>3.3</version>
++ <configuration>
++ <source>1.6</source>
++ <target>1.6</target>
++ <encoding>UTF-8</encoding>
++ </configuration>
++ </plugin>
++ </plugins>
++ </build>
++</project>