You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/03/20 22:55:00 UTC

[03/16] storm git commit: STORM-2416: break out storm-jms-examples

STORM-2416: break out storm-jms-examples


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d6c8298d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d6c8298d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d6c8298d

Branch: refs/heads/1.x-branch
Commit: d6c8298d8a596589442cfa049039ff90baa33f5f
Parents: e0b1333
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Thu Mar 16 15:14:26 2017 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Thu Mar 16 15:14:26 2017 -0400

----------------------------------------------------------------------
 examples/storm-jms-examples/README.markdown     |  12 +
 examples/storm-jms-examples/pom.xml             | 151 +++++++
 .../storm/jms/example/ExampleJmsTopology.java   | 131 ++++++
 .../apache/storm/jms/example/GenericBolt.java   | 116 ++++++
 .../storm/jms/example/JsonTupleProducer.java    |  58 +++
 .../storm/jms/example/SpringJmsProvider.java    |  74 ++++
 .../src/main/resources/jms-activemq.xml         |  53 +++
 .../src/main/resources/log4j.properties         |  29 ++
 external/storm-jms/core/pom.xml                 |  95 -----
 .../apache/storm/jms/JmsMessageProducer.java    |  46 ---
 .../java/org/apache/storm/jms/JmsProvider.java  |  48 ---
 .../org/apache/storm/jms/JmsTupleProducer.java  |  58 ---
 .../java/org/apache/storm/jms/bolt/JmsBolt.java | 219 ----------
 .../apache/storm/jms/spout/JmsMessageID.java    |  58 ---
 .../org/apache/storm/jms/spout/JmsSpout.java    | 382 -----------------
 .../org/apache/storm/jms/trident/JmsBatch.java  |  27 --
 .../org/apache/storm/jms/trident/JmsState.java  | 129 ------
 .../storm/jms/trident/JmsStateFactory.java      |  40 --
 .../apache/storm/jms/trident/JmsUpdater.java    |  38 --
 .../storm/jms/trident/TridentJmsSpout.java      | 409 -------------------
 .../apache/storm/jms/spout/JmsSpoutTest.java    |  88 ----
 .../apache/storm/jms/spout/MockJmsProvider.java |  62 ---
 .../jms/spout/MockSpoutOutputCollector.java     |  55 ---
 .../storm/jms/spout/MockTupleProducer.java      |  47 ---
 .../core/src/test/resources/jndi.properties     |  18 -
 external/storm-jms/examples/README.markdown     |  12 -
 external/storm-jms/examples/pom.xml             | 151 -------
 .../storm/jms/example/ExampleJmsTopology.java   | 131 ------
 .../apache/storm/jms/example/GenericBolt.java   | 116 ------
 .../storm/jms/example/JsonTupleProducer.java    |  58 ---
 .../storm/jms/example/SpringJmsProvider.java    |  74 ----
 .../src/main/resources/jms-activemq.xml         |  53 ---
 .../src/main/resources/log4j.properties         |  29 --
 external/storm-jms/pom.xml                      |  39 +-
 .../apache/storm/jms/JmsMessageProducer.java    |  46 +++
 .../java/org/apache/storm/jms/JmsProvider.java  |  48 +++
 .../org/apache/storm/jms/JmsTupleProducer.java  |  58 +++
 .../java/org/apache/storm/jms/bolt/JmsBolt.java | 219 ++++++++++
 .../apache/storm/jms/spout/JmsMessageID.java    |  58 +++
 .../org/apache/storm/jms/spout/JmsSpout.java    | 382 +++++++++++++++++
 .../org/apache/storm/jms/trident/JmsBatch.java  |  27 ++
 .../org/apache/storm/jms/trident/JmsState.java  | 129 ++++++
 .../storm/jms/trident/JmsStateFactory.java      |  40 ++
 .../apache/storm/jms/trident/JmsUpdater.java    |  38 ++
 .../storm/jms/trident/TridentJmsSpout.java      | 409 +++++++++++++++++++
 .../apache/storm/jms/spout/JmsSpoutTest.java    |  88 ++++
 .../apache/storm/jms/spout/MockJmsProvider.java |  62 +++
 .../jms/spout/MockSpoutOutputCollector.java     |  55 +++
 .../storm/jms/spout/MockTupleProducer.java      |  47 +++
 .../src/test/resources/jndi.properties          |  18 +
 pom.xml                                         |   1 +
 51 files changed, 2381 insertions(+), 2450 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/examples/storm-jms-examples/README.markdown
----------------------------------------------------------------------
diff --git a/examples/storm-jms-examples/README.markdown b/examples/storm-jms-examples/README.markdown
new file mode 100644
index 0000000..7a4d8f0
--- /dev/null
+++ b/examples/storm-jms-examples/README.markdown
@@ -0,0 +1,12 @@
+## About Storm JMS Examples
+This project contains a simple storm topology that illustrates the usage of "storm-jms".
+
+To build:
+
+`mvn clean install`
+
+The default build will create a jar file that can be deployed to to a Storm cluster in the "target" directory:
+
+`storm-jms-examples-0.1-SNAPSHOT-jar-with-dependencies.jar`
+
+

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/examples/storm-jms-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-jms-examples/pom.xml b/examples/storm-jms-examples/pom.xml
new file mode 100644
index 0000000..6451283
--- /dev/null
+++ b/examples/storm-jms-examples/pom.xml
@@ -0,0 +1,151 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>1.1.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+
+    <artifactId>storm-jms-examples</artifactId>
+
+    <properties>
+        <spring.version>2.5.6</spring.version>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-beans</artifactId>
+            <version>${spring.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-core</artifactId>
+            <version>${spring.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-context</artifactId>
+            <version>${spring.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-jms</artifactId>
+            <version>${spring.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.xbean</groupId>
+            <artifactId>xbean-spring</artifactId>
+            <version>3.7</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <!-- keep storm out of the jar-with-dependencies -->
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-jms</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-core</artifactId>
+            <version>5.4.0</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <!-- bind the maven-assembly-plugin to the package phase this will create
+                a jar file without the storm dependencies suitable for deployment to a cluster. -->
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                    <archive>
+                        <manifest>
+                            <mainClass></mainClass>
+                        </manifest>
+                    </archive>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+
+            </plugin>
+
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>exec-maven-plugin</artifactId>
+                <version>1.2.1</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>exec</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <executable>java</executable>
+                    <includeProjectDependencies>true</includeProjectDependencies>
+                    <includePluginDependencies>true</includePluginDependencies>
+                    <mainClass>org.apache.storm.jms.example.ExampleJmsTopology</mainClass>
+                    <systemProperties>
+                        <systemProperty>
+                            <key>log4j.configuration</key>
+                            <value>file:./src/main/resources/log4j.properties</value>
+                        </systemProperty>
+                    </systemProperties>
+                </configuration>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.apache.storm</groupId>
+                        <artifactId>storm-core</artifactId>
+                        <version>${project.version}</version>
+                        <type>jar</type>
+                    </dependency>
+                </dependencies>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
new file mode 100644
index 0000000..3324aac
--- /dev/null
+++ b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.example;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.jms.JmsMessageProducer;
+import org.apache.storm.jms.JmsProvider;
+import org.apache.storm.jms.JmsTupleProducer;
+import org.apache.storm.jms.bolt.JmsBolt;
+import org.apache.storm.jms.spout.JmsSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.utils.Utils;
+
+public class ExampleJmsTopology {
+    public static final String JMS_QUEUE_SPOUT = "JMS_QUEUE_SPOUT";
+    public static final String INTERMEDIATE_BOLT = "INTERMEDIATE_BOLT";
+    public static final String FINAL_BOLT = "FINAL_BOLT";
+    public static final String JMS_TOPIC_BOLT = "JMS_TOPIC_BOLT";
+    public static final String JMS_TOPIC_SPOUT = "JMS_TOPIC_SPOUT";
+    public static final String ANOTHER_BOLT = "ANOTHER_BOLT";
+
+    @SuppressWarnings("serial")
+    public static void main(String[] args) throws Exception {
+
+        // JMS Queue Provider
+        JmsProvider jmsQueueProvider = new SpringJmsProvider(
+                "jms-activemq.xml", "jmsConnectionFactory",
+                "notificationQueue");
+
+        // JMS Topic provider
+        JmsProvider jmsTopicProvider = new SpringJmsProvider(
+                "jms-activemq.xml", "jmsConnectionFactory",
+                "notificationTopic");
+
+        // JMS Producer
+        JmsTupleProducer producer = new JsonTupleProducer();
+
+        // JMS Queue Spout
+        JmsSpout queueSpout = new JmsSpout();
+        queueSpout.setJmsProvider(jmsQueueProvider);
+        queueSpout.setJmsTupleProducer(producer);
+        queueSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
+        queueSpout.setDistributed(true); // allow multiple instances
+
+        TopologyBuilder builder = new TopologyBuilder();
+
+        // spout with 5 parallel instances
+        builder.setSpout(JMS_QUEUE_SPOUT, queueSpout, 5);
+
+        // intermediate bolt, subscribes to jms spout, anchors on tuples, and auto-acks
+        builder.setBolt(INTERMEDIATE_BOLT,
+                new GenericBolt("INTERMEDIATE_BOLT", true, true, new Fields("json")), 3).shuffleGrouping(
+                JMS_QUEUE_SPOUT);
+
+        // bolt that subscribes to the intermediate bolt, and auto-acks
+        // messages.
+        builder.setBolt(FINAL_BOLT, new GenericBolt("FINAL_BOLT", true, true), 3).shuffleGrouping(
+                INTERMEDIATE_BOLT);
+
+        // bolt that subscribes to the intermediate bolt, and publishes to a JMS Topic
+        JmsBolt jmsBolt = new JmsBolt();
+        jmsBolt.setJmsProvider(jmsTopicProvider);
+
+        // anonymous message producer just calls toString() on the tuple to create a jms message
+        jmsBolt.setJmsMessageProducer(new JmsMessageProducer() {
+            @Override
+            public Message toMessage(Session session, ITuple input) throws JMSException {
+                System.out.println("Sending JMS Message:" + input.toString());
+                TextMessage tm = session.createTextMessage(input.toString());
+                return tm;
+            }
+        });
+
+        builder.setBolt(JMS_TOPIC_BOLT, jmsBolt).shuffleGrouping(INTERMEDIATE_BOLT);
+
+        // JMS Topic spout
+        JmsSpout topicSpout = new JmsSpout();
+        topicSpout.setJmsProvider(jmsTopicProvider);
+        topicSpout.setJmsTupleProducer(producer);
+        topicSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
+        topicSpout.setDistributed(false);
+
+        builder.setSpout(JMS_TOPIC_SPOUT, topicSpout);
+
+        builder.setBolt(ANOTHER_BOLT, new GenericBolt("ANOTHER_BOLT", true, true), 1).shuffleGrouping(
+                JMS_TOPIC_SPOUT);
+
+        Config conf = new Config();
+
+        if (args.length > 0) {
+            conf.setNumWorkers(3);
+
+            StormSubmitter.submitTopology(args[0], conf,
+                    builder.createTopology());
+        } else {
+
+            conf.setDebug(true);
+
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("storm-jms-example", conf, builder.createTopology());
+            Utils.sleep(60000);
+            cluster.killTopology("storm-jms-example");
+            cluster.shutdown();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/GenericBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/GenericBolt.java b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/GenericBolt.java
new file mode 100644
index 0000000..57de1ba
--- /dev/null
+++ b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/GenericBolt.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.example;
+
+import java.util.Map;
+
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * A generic <code>org.apache.storm.topology.IRichBolt</code> implementation
+ * for testing/debugging the Storm JMS Spout and example topologies.
+ * <p>
+ * For debugging purposes, set the log level of the
+ * <code>org.apache.storm.contrib.jms</code> package to DEBUG for debugging
+ * output.
+ *
+ * @author tgoetz
+ */
+@SuppressWarnings("serial")
+public class GenericBolt extends BaseRichBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(GenericBolt.class);
+    private OutputCollector collector;
+    private boolean autoAck = false;
+    private boolean autoAnchor = false;
+    private Fields declaredFields;
+    private String name;
+
+    /**
+     * Constructs a new <code>GenericBolt</code> instance.
+     *
+     * @param name           The name of the bolt (used in DEBUG logging)
+     * @param autoAck        Whether or not this bolt should automatically acknowledge received tuples.
+     * @param autoAnchor     Whether or not this bolt should automatically anchor to received tuples.
+     * @param declaredFields The fields this bolt declares as output.
+     */
+    public GenericBolt(String name, boolean autoAck, boolean autoAnchor, Fields declaredFields) {
+        this.name = name;
+        this.autoAck = autoAck;
+        this.autoAnchor = autoAnchor;
+        this.declaredFields = declaredFields;
+    }
+
+    public GenericBolt(String name, boolean autoAck, boolean autoAnchor) {
+        this(name, autoAck, autoAnchor, null);
+    }
+
+    @SuppressWarnings("rawtypes")
+    public void prepare(Map stormConf, TopologyContext context,
+                        OutputCollector collector) {
+        this.collector = collector;
+
+    }
+
+    public void execute(Tuple input) {
+        LOG.debug("[" + this.name + "] Received message: " + input);
+
+
+        // only emit if we have declared fields.
+        if (this.declaredFields != null) {
+            LOG.debug("[" + this.name + "] emitting: " + input);
+            if (this.autoAnchor) {
+                this.collector.emit(input, input.getValues());
+            } else {
+                this.collector.emit(input.getValues());
+            }
+        }
+
+        if (this.autoAck) {
+            LOG.debug("[" + this.name + "] ACKing tuple: " + input);
+            this.collector.ack(input);
+        }
+
+    }
+
+    public void cleanup() {
+
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        if (this.declaredFields != null) {
+            declarer.declare(this.declaredFields);
+        }
+    }
+
+    public boolean isAutoAck() {
+        return this.autoAck;
+    }
+
+    public void setAutoAck(boolean autoAck) {
+        this.autoAck = autoAck;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/JsonTupleProducer.java
----------------------------------------------------------------------
diff --git a/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/JsonTupleProducer.java b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/JsonTupleProducer.java
new file mode 100644
index 0000000..9ee175e
--- /dev/null
+++ b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/JsonTupleProducer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.example;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+
+import org.apache.storm.jms.JmsTupleProducer;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+/**
+ * A simple <code>JmsTupleProducer</code> that expects to receive
+ * JMS <code>TextMessage</code> objects with a body in JSON format.
+ * <p/>
+ * Ouputs a tuple with field name "json" and a string value
+ * containing the raw json.
+ * <p/>
+ * <b>NOTE: </b> Currently this implementation assumes the text is valid
+ * JSON and does not attempt to parse or validate it.
+ * 
+ * @author tgoetz
+ *
+ */
+@SuppressWarnings("serial")
+public class JsonTupleProducer implements JmsTupleProducer {
+
+	public Values toTuple(Message msg) throws JMSException {
+		if(msg instanceof TextMessage){
+			String json = ((TextMessage) msg).getText();
+			return new Values(json);
+		} else {
+			return null;
+		}
+	}
+
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		declarer.declare(new Fields("json"));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java
----------------------------------------------------------------------
diff --git a/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java
new file mode 100644
index 0000000..306fc25
--- /dev/null
+++ b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.example;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+import org.apache.storm.jms.JmsProvider;
+
+
+/**
+ * A <code>JmsProvider</code> that uses the spring framework
+ * to obtain a JMS <code>ConnectionFactory</code> and 
+ * <code>Desitnation</code> objects.
+ * <p/>
+ * The constructor takes three arguments:
+ * <ol>
+ * <li>A string pointing to the the spring application context file contining the JMS configuration
+ * (must be on the classpath)
+ * </li>
+ * <li>The name of the connection factory bean</li>
+ * <li>The name of the destination bean</li>
+ * </ol>
+ * 
+ *
+ *
+ */
+@SuppressWarnings("serial")
+public class SpringJmsProvider implements JmsProvider {
+	private ConnectionFactory connectionFactory;
+	private Destination destination;
+	
+	/**
+	 * Constructs a <code>SpringJmsProvider</code> object given the name of a
+	 * classpath resource (the spring application context file), and the bean
+	 * names of a JMS connection factory and destination.
+	 * 
+	 * @param appContextClasspathResource - the spring configuration file (classpath resource)
+	 * @param connectionFactoryBean - the JMS connection factory bean name
+	 * @param destinationBean - the JMS destination bean name
+	 */
+	public SpringJmsProvider(String appContextClasspathResource, String connectionFactoryBean, String destinationBean){
+		ApplicationContext context = new ClassPathXmlApplicationContext(appContextClasspathResource);
+		this.connectionFactory = (ConnectionFactory)context.getBean(connectionFactoryBean);
+		this.destination = (Destination)context.getBean(destinationBean);
+	}
+
+	public ConnectionFactory connectionFactory() throws Exception {
+		return this.connectionFactory;
+	}
+
+	public Destination destination() throws Exception {
+		return this.destination;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/examples/storm-jms-examples/src/main/resources/jms-activemq.xml
----------------------------------------------------------------------
diff --git a/examples/storm-jms-examples/src/main/resources/jms-activemq.xml b/examples/storm-jms-examples/src/main/resources/jms-activemq.xml
new file mode 100644
index 0000000..1a845b8
--- /dev/null
+++ b/examples/storm-jms-examples/src/main/resources/jms-activemq.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans 
+  xmlns="http://www.springframework.org/schema/beans" 
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+	<!-- ActiveMQ -->
+
+	<!-- embedded ActiveMQ Broker -->
+	<!-- <amq:broker useJmx="false" persistent="false">
+		<amq:transportConnectors>
+			<amq:transportConnector uri="tcp://localhost:61616" />
+		</amq:transportConnectors>
+	</amq:broker> -->
+
+	<amq:queue id="notificationQueue" physicalName="backtype.storm.contrib.example.queue" />
+	
+	<amq:topic id="notificationTopic" physicalName="backtype.storm.contrib.example.topic" />
+
+	<amq:connectionFactory id="jmsConnectionFactory"
+		brokerURL="tcp://localhost:61616" />
+
+	<!-- <bean id="queueTemplate" class="org.springframework.jms.core.JmsTemplate">
+		<property name="connectionFactory">
+			<ref bean="jmsConnectionFactory" />
+		</property>
+		<property name="pubSubDomain" value="false" />
+	</bean> -->
+	
+</beans>
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/examples/storm-jms-examples/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/storm-jms-examples/src/main/resources/log4j.properties b/examples/storm-jms-examples/src/main/resources/log4j.properties
new file mode 100644
index 0000000..079b195
--- /dev/null
+++ b/examples/storm-jms-examples/src/main/resources/log4j.properties
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+
+log4j.appender.stdout.layout.ConversionPattern=%5p (%C:%L) - %m%n
+
+
+log4j.logger.backtype.storm.contrib=DEBUG
+log4j.logger.clojure.contrib=WARN
+log4j.logger.org.springframework=WARN
+log4j.logger.org.apache.zookeeper=WARN
+

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/pom.xml b/external/storm-jms/core/pom.xml
deleted file mode 100644
index c2c04ea..0000000
--- a/external/storm-jms/core/pom.xml
+++ /dev/null
@@ -1,95 +0,0 @@
-<?xml version="1.0"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.apache.storm</groupId>
-        <artifactId>storm-jms-parent</artifactId>
-        <version>1.1.0-SNAPSHOT</version>
-        <relativePath>../pom.xml</relativePath>
-    </parent>
-
-
-
-    <artifactId>storm-jms</artifactId>
-
-
-
-    <developers>
-        <developer>
-            <id>ptgoetz</id>
-            <name>P. Taylor Goetz</name>
-            <email>ptgoetz@gmail.com</email>
-        </developer>
-    </developers>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-core</artifactId>
-            <version>${project.version}</version>
-            <!-- keep storm out of the jar-with-dependencies -->
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.geronimo.specs</groupId>
-            <artifactId>geronimo-jms_1.1_spec</artifactId>
-            <version>1.1.1</version>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>4.10</version>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- Active MQ -->
-        <dependency>
-            <groupId>org.apache.activemq</groupId>
-            <artifactId>activemq-core</artifactId>
-            <version>5.5.1</version>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-    </dependencies>
-
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-javadoc-plugin</artifactId>
-                <version>2.9</version>
-                <configuration>
-                    <additionalparam>-Xdoclint:none</additionalparam>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsMessageProducer.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsMessageProducer.java
deleted file mode 100644
index 4932929..0000000
--- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsMessageProducer.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms;
-
-import java.io.Serializable;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Session;
-
-import org.apache.storm.tuple.ITuple;
-
-/**
- * JmsMessageProducer implementations are responsible for translating
- * a <code>org.apache.storm.tuple.Values</code> instance into a
- * <code>javax.jms.Message</code> object.
- * <p>
- */
-public interface JmsMessageProducer extends Serializable {
-
-    /**
-     * Translate a <code>org.apache.storm.tuple.Tuple</code> object
-     * to a <code>javax.jms.Message</code object.
-     *
-     * @param session
-     * @param input
-     * @return
-     * @throws JMSException
-     */
-    public Message toMessage(Session session, ITuple input) throws JMSException;
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsProvider.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsProvider.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsProvider.java
deleted file mode 100644
index d976326..0000000
--- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsProvider.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms;
-
-import java.io.Serializable;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-
-/**
- * A <code>JmsProvider</code> object encapsulates the <code>ConnectionFactory</code>
- * and <code>Destination</code> JMS objects the <code>JmsSpout</code> needs to manage
- * a topic/queue connection over the course of it's lifecycle.
- *
- */
-public interface JmsProvider extends Serializable {
-    /**
-     * Provides the JMS <code>ConnectionFactory</code>
-     *
-     * @return the connection factory
-     * @throws Exception
-     */
-    public ConnectionFactory connectionFactory() throws Exception;
-
-    /**
-     * Provides the <code>Destination</code> (topic or queue) from which the
-     * <code>JmsSpout</code> will receive messages.
-     *
-     * @return
-     * @throws Exception
-     */
-    public Destination destination() throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsTupleProducer.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsTupleProducer.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsTupleProducer.java
deleted file mode 100644
index 0bbb3a0..0000000
--- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsTupleProducer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms;
-
-import java.io.Serializable;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Values;
-
-/**
- * Interface to define classes that can produce a Storm <code>Values</code> objects
- * from a <code>javax.jms.Message</code> object>.
- * <p>
- * Implementations are also responsible for declaring the output
- * fields they produce.
- * <p>
- * If for some reason the implementation can't process a message
- * (for example if it received a <code>javax.jms.ObjectMessage</code>
- * when it was expecting a <code>javax.jms.TextMessage</code> it should
- * return <code>null</code> to indicate to the <code>JmsSpout</code> that
- * the message could not be processed.
- *
- */
-public interface JmsTupleProducer extends Serializable {
-    /**
-     * Process a JMS message object to create a Values object.
-     *
-     * @param msg - the JMS message
-     * @return the Values tuple, or null if the message couldn't be processed.
-     * @throws JMSException
-     */
-    Values toTuple(Message msg) throws JMSException;
-
-    /**
-     * Declare the output fields produced by this JmsTupleProducer.
-     *
-     * @param declarer The OuputFieldsDeclarer for the spout.
-     */
-    void declareOutputFields(OutputFieldsDeclarer declarer);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
deleted file mode 100644
index d691e75..0000000
--- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.bolt;
-
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.jms.JmsMessageProducer;
-import org.apache.storm.jms.JmsProvider;
-import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Tuple;
-
-/**
- * A JmsBolt receives <code>org.apache.storm.tuple.Tuple</code> objects from a Storm
- * topology and publishes JMS Messages to a destination (topic or queue).
- * <p>
- * To use a JmsBolt in a topology, the following must be supplied:
- * <ol>
- * <li>A <code>JmsProvider</code> implementation.</li>
- * <li>A <code>JmsMessageProducer</code> implementation.</li>
- * </ol>
- * The <code>JmsProvider</code> provides the JMS <code>javax.jms.ConnectionFactory</code>
- * and <code>javax.jms.Destination</code> objects requied to publish JMS messages.
- * <p>
- * The JmsBolt uses a <code>JmsMessageProducer</code> to translate
- * <code>org.apache.storm.tuple.Tuple</code> objects into
- * <code>javax.jms.Message</code> objects for publishing.
- * <p>
- * Both JmsProvider and JmsMessageProducer must be set, or the bolt will
- * fail upon deployment to a cluster.
- * <p>
- * The JmsBolt is typically an endpoint in a topology -- in other words
- * it does not emit any tuples.
- */
-public class JmsBolt extends BaseTickTupleAwareRichBolt {
-    private static Logger LOG = LoggerFactory.getLogger(JmsBolt.class);
-
-    private boolean autoAck = true;
-
-    // javax.jms objects
-    private Connection connection;
-    private Session session;
-    private MessageProducer messageProducer;
-
-    // JMS options
-    private boolean jmsTransactional = false;
-    private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
-
-
-    private JmsProvider jmsProvider;
-    private JmsMessageProducer producer;
-
-
-    private OutputCollector collector;
-
-    /**
-     * Set the JmsProvider used to connect to the JMS destination topic/queue
-     *
-     * @param provider
-     */
-    public void setJmsProvider(JmsProvider provider) {
-        this.jmsProvider = provider;
-    }
-
-    /**
-     * Set the JmsMessageProducer used to convert tuples
-     * into JMS messages.
-     *
-     * @param producer
-     */
-    public void setJmsMessageProducer(JmsMessageProducer producer) {
-        this.producer = producer;
-    }
-
-    /**
-     * Sets the JMS acknowledgement mode for JMS messages sent
-     * by this bolt.
-     * <p>
-     * Possible values:
-     * <ul>
-     * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
-     * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
-     * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
-     * </ul>
-     *
-     * @param acknowledgeMode (constant defined in javax.jms.Session)
-     */
-    public void setJmsAcknowledgeMode(int acknowledgeMode) {
-        this.jmsAcknowledgeMode = acknowledgeMode;
-    }
-
-    /**
-     * Set the JMS transactional setting for the JMS session.
-     *
-     * @param transactional
-     */
-//	public void setJmsTransactional(boolean transactional){
-//		this.jmsTransactional = transactional;
-//	}
-
-    /**
-     * Sets whether or not tuples should be acknowledged by this
-     * bolt.
-     * <p>
-     *
-     * @param autoAck
-     */
-    public void setAutoAck(boolean autoAck) {
-        this.autoAck = autoAck;
-    }
-
-
-    /**
-     * Consumes a tuple and sends a JMS message.
-     * <p>
-     * If autoAck is true, the tuple will be acknowledged
-     * after the message is sent.
-     * <p>
-     * If JMS sending fails, the tuple will be failed.
-     */
-    @Override
-    protected void process(Tuple input) {
-        // write the tuple to a JMS destination...
-        LOG.debug("Tuple received. Sending JMS message.");
-
-        try {
-            Message msg = this.producer.toMessage(this.session, input);
-            if (msg != null) {
-                if (msg.getJMSDestination() != null) {
-                    this.messageProducer.send(msg.getJMSDestination(), msg);
-                } else {
-                    this.messageProducer.send(msg);
-                }
-            }
-            if (this.autoAck) {
-                LOG.debug("ACKing tuple: " + input);
-                this.collector.ack(input);
-            }
-        } catch (JMSException e) {
-            // failed to send the JMS message, fail the tuple fast
-            LOG.warn("Failing tuple: " + input);
-            LOG.warn("Exception: ", e);
-            this.collector.fail(input);
-        }
-    }
-
-    /**
-     * Releases JMS resources.
-     */
-    @Override
-    public void cleanup() {
-        try {
-            LOG.debug("Closing JMS connection.");
-            this.session.close();
-            this.connection.close();
-        } catch (JMSException e) {
-            LOG.warn("Error closing JMS connection.", e);
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    }
-
-    /**
-     * Initializes JMS resources.
-     */
-    @Override
-    public void prepare(Map stormConf, TopologyContext context,
-                        OutputCollector collector) {
-        if (this.jmsProvider == null || this.producer == null) {
-            throw new IllegalStateException("JMS Provider and MessageProducer not set.");
-        }
-        this.collector = collector;
-        LOG.debug("Connecting JMS..");
-        try {
-            ConnectionFactory cf = this.jmsProvider.connectionFactory();
-            Destination dest = this.jmsProvider.destination();
-            this.connection = cf.createConnection();
-            this.session = connection.createSession(this.jmsTransactional,
-                    this.jmsAcknowledgeMode);
-            this.messageProducer = session.createProducer(dest);
-
-            connection.start();
-        } catch (Exception e) {
-            LOG.warn("Error creating JMS connection.", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java
deleted file mode 100644
index b78a41e..0000000
--- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.spout;
-
-import java.io.Serializable;
-
-public class JmsMessageID implements Comparable<JmsMessageID>, Serializable {
-
-    private String jmsID;
-
-    private Long sequence;
-
-    public JmsMessageID(long sequence, String jmsID){
-        this.jmsID = jmsID;
-        this.sequence = sequence;
-    }
-
-
-    public String getJmsID(){
-        return this.jmsID;
-    }
-
-    @Override
-    public int compareTo(JmsMessageID jmsMessageID) {
-        return (int)(this.sequence - jmsMessageID.sequence);
-    }
-
-    @Override
-    public int hashCode() {
-        return this.sequence.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if(o instanceof JmsMessageID){
-            JmsMessageID id = (JmsMessageID)o;
-            return this.jmsID.equals(id.jmsID);
-        } else {
-            return false;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/spout/JmsSpout.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
deleted file mode 100644
index 6aaa7c9..0000000
--- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.spout;
-
-import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.jms.JmsProvider;
-import org.apache.storm.jms.JmsTupleProducer;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-
-/**
- * A Storm <code>Spout</code> implementation that listens to a JMS topic or queue
- * and outputs tuples based on the messages it receives.
- * <p>
- * <code>JmsSpout</code> instances rely on <code>JmsProducer</code> implementations
- * to obtain the JMS <code>ConnectionFactory</code> and <code>Destination</code> objects
- * necessary to connect to a JMS topic/queue.
- * <p>
- * When a <code>JmsSpout</code> receives a JMS message, it delegates to an
- * internal <code>JmsTupleProducer</code> instance to create a Storm tuple from the
- * incoming message.
- * <p>
- * Typically, developers will supply a custom <code>JmsTupleProducer</code> implementation
- * appropriate for the expected message content.
- */
-@SuppressWarnings("serial")
-public class JmsSpout extends BaseRichSpout implements MessageListener {
-    private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class);
-
-    // JMS options
-    private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
-
-    private boolean distributed = true;
-
-    private JmsTupleProducer tupleProducer;
-
-    private JmsProvider jmsProvider;
-
-    private LinkedBlockingQueue<Message> queue;
-    private TreeSet<JmsMessageID> toCommit;
-    private HashMap<JmsMessageID, Message> pendingMessages;
-    private long messageSequence = 0;
-
-    private SpoutOutputCollector collector;
-
-    private transient Connection connection;
-    private transient Session session;
-
-    private boolean hasFailures = false;
-    public final Serializable recoveryMutex = "RECOVERY_MUTEX";
-    private Timer recoveryTimer = null;
-    private long recoveryPeriod = -1; // default to disabled
-
-    /**
-     * Sets the JMS Session acknowledgement mode for the JMS seesion associated with this spout.
-     * <p>
-     * Possible values:
-     * <ul>
-     * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
-     * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
-     * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
-     * </ul>
-     *
-     * @param mode JMS Session Acknowledgement mode
-     * @throws IllegalArgumentException if the mode is not recognized.
-     */
-    public void setJmsAcknowledgeMode(int mode) {
-        switch (mode) {
-            case Session.AUTO_ACKNOWLEDGE:
-            case Session.CLIENT_ACKNOWLEDGE:
-            case Session.DUPS_OK_ACKNOWLEDGE:
-                break;
-            default:
-                throw new IllegalArgumentException("Unknown Acknowledge mode: " + mode + " (See javax.jms.Session for valid values)");
-
-        }
-        this.jmsAcknowledgeMode = mode;
-    }
-
-    /**
-     * Returns the JMS Session acknowledgement mode for the JMS seesion associated with this spout.
-     *
-     * @return
-     */
-    public int getJmsAcknowledgeMode() {
-        return this.jmsAcknowledgeMode;
-    }
-
-    /**
-     * Set the <code>JmsProvider</code>
-     * implementation that this Spout will use to connect to
-     * a JMS <code>javax.jms.Desination</code>
-     *
-     * @param provider
-     */
-    public void setJmsProvider(JmsProvider provider) {
-        this.jmsProvider = provider;
-    }
-
-    /**
-     * Set the <code>JmsTupleProducer</code>
-     * implementation that will convert <code>javax.jms.Message</code>
-     * object to <code>org.apache.storm.tuple.Values</code> objects
-     * to be emitted.
-     *
-     * @param producer
-     */
-    public void setJmsTupleProducer(JmsTupleProducer producer) {
-        this.tupleProducer = producer;
-    }
-
-    /**
-     * <code>javax.jms.MessageListener</code> implementation.
-     * <p>
-     * Stored the JMS message in an internal queue for processing
-     * by the <code>nextTuple()</code> method.
-     */
-    public void onMessage(Message msg) {
-        try {
-            LOG.debug("Queuing msg [" + msg.getJMSMessageID() + "]");
-        } catch (JMSException e) {
-        }
-        this.queue.offer(msg);
-    }
-
-    /**
-     * <code>ISpout</code> implementation.
-     * <p>
-     * Connects the JMS spout to the configured JMS destination
-     * topic/queue.
-     */
-    @SuppressWarnings("rawtypes")
-    public void open(Map conf, TopologyContext context,
-                     SpoutOutputCollector collector) {
-        if (this.jmsProvider == null) {
-            throw new IllegalStateException("JMS provider has not been set.");
-        }
-        if (this.tupleProducer == null) {
-            throw new IllegalStateException("JMS Tuple Producer has not been set.");
-        }
-        Integer topologyTimeout = (Integer) conf.get("topology.message.timeout.secs");
-        // TODO fine a way to get the default timeout from storm, so we're not hard-coding to 30 seconds (it could change)
-        topologyTimeout = topologyTimeout == null ? 30 : topologyTimeout;
-        if ((topologyTimeout.intValue() * 1000) > this.recoveryPeriod) {
-            LOG.warn("*** WARNING *** : " +
-                    "Recovery period (" + this.recoveryPeriod + " ms.) is less then the configured " +
-                    "'topology.message.timeout.secs' of " + topologyTimeout +
-                    " secs. This could lead to a message replay flood!");
-        }
-        this.queue = new LinkedBlockingQueue<Message>();
-        this.toCommit = new TreeSet<JmsMessageID>();
-        this.pendingMessages = new HashMap<JmsMessageID, Message>();
-        this.collector = collector;
-        try {
-            ConnectionFactory cf = this.jmsProvider.connectionFactory();
-            Destination dest = this.jmsProvider.destination();
-            this.connection = cf.createConnection();
-            this.session = connection.createSession(false,
-                    this.jmsAcknowledgeMode);
-            MessageConsumer consumer = session.createConsumer(dest);
-            consumer.setMessageListener(this);
-            this.connection.start();
-            if (this.isDurableSubscription() && this.recoveryPeriod > 0) {
-                this.recoveryTimer = new Timer();
-                this.recoveryTimer.scheduleAtFixedRate(new RecoveryTask(), 10, this.recoveryPeriod);
-            }
-
-        } catch (Exception e) {
-            LOG.warn("Error creating JMS connection.", e);
-        }
-
-    }
-
-    public void close() {
-        try {
-            LOG.debug("Closing JMS connection.");
-            this.session.close();
-            this.connection.close();
-        } catch (JMSException e) {
-            LOG.warn("Error closing JMS connection.", e);
-        }
-
-    }
-
-    public void nextTuple() {
-        Message msg = this.queue.poll();
-        if (msg == null) {
-            Utils.sleep(50);
-        } else {
-
-            LOG.debug("sending tuple: " + msg);
-            // get the tuple from the handler
-            try {
-                Values vals = this.tupleProducer.toTuple(msg);
-                // ack if we're not in AUTO_ACKNOWLEDGE mode, or the message requests ACKNOWLEDGE
-                LOG.debug("Requested deliveryMode: " + toDeliveryModeString(msg.getJMSDeliveryMode()));
-                LOG.debug("Our deliveryMode: " + toDeliveryModeString(this.jmsAcknowledgeMode));
-                if (this.isDurableSubscription()) {
-                    LOG.debug("Requesting acks.");
-                    JmsMessageID messageId = new JmsMessageID(this.messageSequence++, msg.getJMSMessageID());
-                    this.collector.emit(vals, messageId);
-
-                    // at this point we successfully emitted. Store
-                    // the message and message ID so we can do a
-                    // JMS acknowledge later
-                    this.pendingMessages.put(messageId, msg);
-                    this.toCommit.add(messageId);
-                } else {
-                    this.collector.emit(vals);
-                }
-            } catch (JMSException e) {
-                LOG.warn("Unable to convert JMS message: " + msg);
-            }
-
-        }
-
-    }
-
-    /*
-     * Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
-     */
-    public void ack(Object msgId) {
-
-        Message msg = this.pendingMessages.remove(msgId);
-        JmsMessageID oldest = this.toCommit.first();
-        if (msgId.equals(oldest)) {
-            if (msg != null) {
-                try {
-                    LOG.debug("Committing...");
-                    msg.acknowledge();
-                    LOG.debug("JMS Message acked: " + msgId);
-                    this.toCommit.remove(msgId);
-                } catch (JMSException e) {
-                    LOG.warn("Error acknowldging JMS message: " + msgId, e);
-                }
-            } else {
-                LOG.warn("Couldn't acknowledge unknown JMS message ID: " + msgId);
-            }
-        } else {
-            this.toCommit.remove(msgId);
-        }
-
-    }
-
-    /*
-     * Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
-     */
-    public void fail(Object msgId) {
-        LOG.warn("Message failed: " + msgId);
-        this.pendingMessages.clear();
-        this.toCommit.clear();
-        synchronized (this.recoveryMutex) {
-            this.hasFailures = true;
-        }
-    }
-
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        this.tupleProducer.declareOutputFields(declarer);
-
-    }
-
-    /**
-     * Returns <code>true</code> if the spout has received failures
-     * from which it has not yet recovered.
-     */
-    public boolean hasFailures() {
-        return this.hasFailures;
-    }
-
-    protected void recovered() {
-        this.hasFailures = false;
-    }
-
-    /**
-     * Sets the periodicity of the timer task that
-     * checks for failures and recovers the JMS session.
-     *
-     * @param period
-     */
-    public void setRecoveryPeriod(long period) {
-        this.recoveryPeriod = period;
-    }
-
-    public boolean isDistributed() {
-        return this.distributed;
-    }
-
-    /**
-     * Sets the "distributed" mode of this spout.
-     * <p>
-     * If <code>true</code> multiple instances of this spout <i>may</i> be
-     * created across the cluster (depending on the "parallelism_hint" in the topology configuration).
-     * <p>
-     * Setting this value to <code>false</code> essentially means this spout will run as a singleton
-     * within the cluster ("parallelism_hint" will be ignored).
-     * <p>
-     * In general, this should be set to <code>false</code> if the underlying JMS destination is a
-     * topic, and <code>true</code> if it is a JMS queue.
-     *
-     * @param distributed
-     */
-    public void setDistributed(boolean distributed) {
-        this.distributed = distributed;
-    }
-
-
-    private static final String toDeliveryModeString(int deliveryMode) {
-        switch (deliveryMode) {
-            case Session.AUTO_ACKNOWLEDGE:
-                return "AUTO_ACKNOWLEDGE";
-            case Session.CLIENT_ACKNOWLEDGE:
-                return "CLIENT_ACKNOWLEDGE";
-            case Session.DUPS_OK_ACKNOWLEDGE:
-                return "DUPS_OK_ACKNOWLEDGE";
-            default:
-                return "UNKNOWN";
-
-        }
-    }
-
-    protected Session getSession() {
-        return this.session;
-    }
-
-    private boolean isDurableSubscription() {
-        return (this.jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE);
-    }
-
-
-    private class RecoveryTask extends TimerTask {
-        private final Logger LOG = LoggerFactory.getLogger(RecoveryTask.class);
-
-        public void run() {
-            synchronized (JmsSpout.this.recoveryMutex) {
-                if (JmsSpout.this.hasFailures()) {
-                    try {
-                        LOG.info("Recovering from a message failure.");
-                        JmsSpout.this.getSession().recover();
-                        JmsSpout.this.recovered();
-                    } catch (JMSException e) {
-                        LOG.warn("Could not recover jms session.", e);
-                    }
-                }
-            }
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsBatch.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsBatch.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsBatch.java
deleted file mode 100644
index c990058..0000000
--- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsBatch.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.trident;
-
-/**
- * Batch coordination metadata object for the TridentJmsSpout.
- * This implementation does not use batch metadata, so the object is empty.
- *
- */
-public class JmsBatch {
-    // Empty class
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsState.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsState.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsState.java
deleted file mode 100644
index bfb78b5..0000000
--- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsState.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.trident;
-
-import org.apache.storm.jms.JmsMessageProducer;
-import org.apache.storm.jms.JmsProvider;
-import org.apache.storm.topology.FailedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-import javax.jms.*;
-import java.io.Serializable;
-import java.lang.IllegalStateException;
-import java.util.List;
-
-public class JmsState implements State {
-
-    private static final Logger LOG = LoggerFactory.getLogger(JmsState.class);
-
-    private Options options;
-    private Connection connection;
-    private Session session;
-    private MessageProducer messageProducer;
-
-    protected JmsState(Options options) {
-        this.options = options;
-    }
-
-    public static class Options implements Serializable {
-        private JmsProvider jmsProvider;
-        private JmsMessageProducer msgProducer;
-        private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
-        private boolean jmsTransactional = true;
-
-        public Options withJmsProvider(JmsProvider provider) {
-            this.jmsProvider = provider;
-            return this;
-        }
-
-        public Options withMessageProducer(JmsMessageProducer msgProducer) {
-            this.msgProducer = msgProducer;
-            return this;
-        }
-
-        public Options withJmsAcknowledgeMode(int jmsAcknowledgeMode) {
-            this.jmsAcknowledgeMode = jmsAcknowledgeMode;
-            return this;
-        }
-
-        public Options withJmsTransactional(boolean jmsTransactional) {
-            this.jmsTransactional = jmsTransactional;
-            return this;
-        }
-    }
-
-    protected void prepare() {
-        if(this.options.jmsProvider == null || this.options.msgProducer == null){
-            throw new IllegalStateException("JMS Provider and MessageProducer not set.");
-        }
-        LOG.debug("Connecting JMS..");
-        try {
-            ConnectionFactory cf = this.options.jmsProvider.connectionFactory();
-            Destination dest = this.options.jmsProvider.destination();
-            this.connection = cf.createConnection();
-            this.session = connection.createSession(this.options.jmsTransactional,
-                    this.options.jmsAcknowledgeMode);
-            this.messageProducer = session.createProducer(dest);
-
-            connection.start();
-        } catch (Exception e) {
-            LOG.warn("Error creating JMS connection.", e);
-        }
-    }
-
-    @Override
-    public void beginCommit(Long aLong) {
-    }
-
-    @Override
-    public void commit(Long aLong) {
-        LOG.debug("Committing JMS transaction.");
-        if(this.options.jmsTransactional) {
-            try {
-                session.commit();
-            } catch(JMSException e){
-                LOG.error("JMS Session commit failed.", e);
-            }
-        }
-    }
-
-    public void updateState(List<TridentTuple> tuples, TridentCollector collector) throws JMSException {
-        try {
-        for(TridentTuple tuple : tuples) {
-                Message msg = this.options.msgProducer.toMessage(this.session, tuple);
-                if (msg != null) {
-                    if (msg.getJMSDestination() != null) {
-                        this.messageProducer.send(msg.getJMSDestination(), msg);
-                    } else {
-                        this.messageProducer.send(msg);
-                    }
-                }
-            }
-        } catch (JMSException e) {
-            LOG.warn("Failed to send jmd message for a trident batch ", e);
-            if(this.options.jmsTransactional) {
-                session.rollback();
-            }
-            throw new FailedException("Failed to write tuples", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java
deleted file mode 100644
index 9a02ba9..0000000
--- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.trident;
-
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.state.StateFactory;
-
-import java.util.Map;
-
-public class JmsStateFactory implements StateFactory {
-
-    private JmsState.Options options;
-
-    public JmsStateFactory(JmsState.Options options) {
-        this.options = options;
-    }
-
-    @Override
-    public State makeState(Map map, IMetricsContext iMetricsContext, int partitionIndex, int numPartitions) {
-        JmsState state = new JmsState(options);
-        state.prepare();
-        return state;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java
deleted file mode 100644
index a2709a4..0000000
--- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jms.trident;
-
-import org.apache.storm.topology.FailedException;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.BaseStateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-import javax.jms.JMSException;
-import java.util.List;
-
-public class JmsUpdater extends BaseStateUpdater<JmsState>  {
-
-    @Override
-    public void updateState(JmsState jmsState, List<TridentTuple> tuples, TridentCollector collector) {
-        try {
-            jmsState.updateState(tuples, collector);
-        } catch (JMSException e) {
-            throw new FailedException("failed JMS opetation", e);
-        }
-    }
-}