You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/28 01:44:29 UTC

[3/6] storm git commit: STORM-2468: Remove clojure from storm-client

STORM-2468: Remove clojure from storm-client


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a4cf917d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a4cf917d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a4cf917d

Branch: refs/heads/master
Commit: a4cf917d35352211a80502e04b08d3318cfeb6d1
Parents: 487a246
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Apr 10 07:51:02 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Apr 14 20:51:43 2017 -0500

----------------------------------------------------------------------
 docs/Serialization.md                           |  10 +-
 .../org/apache/storm/starter/clj/bolts_test.clj |  26 +-
 storm-client/pom.xml                            |  13 +-
 .../org/apache/storm/clojure/ClojureBolt.java   | 120 -----
 .../org/apache/storm/clojure/ClojureSpout.java  | 153 -------
 .../org/apache/storm/clojure/RichShellBolt.java |  51 ---
 .../apache/storm/clojure/RichShellSpout.java    |  51 ---
 .../org/apache/storm/cluster/IStateStorage.java |   1 -
 .../storm/cluster/StormClusterStateImpl.java    |   6 -
 .../storm/executor/bolt/BoltExecutor.java       |   4 +-
 .../executor/bolt/BoltOutputCollectorImpl.java  |   9 +-
 .../storm/metric/api/IMetricsConsumer.java      |   8 +-
 .../serialization/SerializationFactory.java     |  17 +-
 .../serialization/SerializationRegister.java    |  35 ++
 .../jvm/org/apache/storm/spout/ShellSpout.java  |   6 +-
 .../jvm/org/apache/storm/stats/StatsUtil.java   |  52 +--
 .../jvm/org/apache/storm/task/ShellBolt.java    |   6 +-
 .../storm/testing/KeyedSummingBatchBolt.java    |   3 +-
 .../storm/trident/operation/builtin/Sum.java    |  24 +-
 .../storm/trident/tuple/TridentTupleView.java   |  21 +-
 .../src/jvm/org/apache/storm/tuple/Tuple.java   |   6 +
 .../jvm/org/apache/storm/tuple/TupleImpl.java   | 235 ++++------
 .../storm/utils/IndifferentAccessMap.java       | 177 --------
 .../src/jvm/org/apache/storm/utils/Utils.java   |  10 -
 storm-clojure/pom.xml                           |  43 +-
 .../org/apache/storm/clojure/ClojureBolt.java   | 119 +++++
 .../clojure/ClojureSerializationRegister.java   |  33 ++
 .../org/apache/storm/clojure/ClojureSpout.java  | 154 +++++++
 .../org/apache/storm/clojure/ClojureTuple.java  | 439 +++++++++++++++++++
 .../org/apache/storm/clojure/ClojureUtil.java   |  31 ++
 .../storm/clojure/IndifferentAccessMap.java     | 176 ++++++++
 .../org/apache/storm/clojure/RichShellBolt.java |  51 +++
 .../apache/storm/clojure/RichShellSpout.java    |  51 +++
 ...he.storm.serialization.SerializationRegister |   1 +
 .../test/clj/org/apache/storm/tuple_test.clj    |  53 +++
 storm-clojure/test/resources/log4j2-test.xml    |  32 ++
 storm-clojure/test/resources/test_runner.clj    | 114 +++++
 storm-core/pom.xml                              | 193 +-------
 storm-core/src/clj/org/apache/storm/config.clj  |  37 --
 .../clj/org/apache/storm/daemon/logviewer.clj   |   2 +-
 .../src/clj/org/apache/storm/daemon_config.clj  |  30 ++
 .../clj/org/apache/storm/internal/clojure.clj   | 204 ---------
 storm-core/src/clj/org/apache/storm/log.clj     |  34 --
 storm-core/src/clj/org/apache/storm/ui/core.clj |   2 +-
 storm-core/src/clj/org/apache/storm/util.clj    | 134 ------
 .../org/apache/storm/integration_test.clj       |   2 +-
 .../org/apache/storm/testing4j_test.clj         |   4 +-
 .../test/clj/org/apache/storm/drpc_test.clj     |   2 +-
 .../test/clj/org/apache/storm/grouping_test.clj |   2 +-
 .../clj/org/apache/storm/logviewer_test.clj     |   2 +-
 .../test/clj/org/apache/storm/metrics_test.clj  |   2 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   |   2 +-
 .../scheduler/multitenant_scheduler_test.clj    |   2 +-
 .../apache/storm/security/auth/auth_test.clj    |   2 +-
 .../storm/security/auth/nimbus_auth_test.clj    |   2 +-
 .../clj/org/apache/storm/transactional_test.clj |   2 +-
 .../test/clj/org/apache/storm/tuple_test.clj    |  52 ---
 storm-core/test/resources/test_runner.clj       | 114 -----
 58 files changed, 1563 insertions(+), 1604 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/docs/Serialization.md
----------------------------------------------------------------------
diff --git a/docs/Serialization.md b/docs/Serialization.md
index c2e129b..e35a0f9 100644
--- a/docs/Serialization.md
+++ b/docs/Serialization.md
@@ -9,7 +9,7 @@ Tuples can be comprised of objects of any types. Since Storm is a distributed sy
 
 Storm uses [Kryo](https://github.com/EsotericSoftware/kryo) for serialization. Kryo is a flexible and fast serialization library that produces small serializations.
 
-By default, Storm can serialize primitive types, strings, byte arrays, ArrayList, HashMap, HashSet, and the Clojure collection types. If you want to use another type in your tuples, you'll need to register a custom serializer.
+By default, Storm can serialize primitive types, strings, byte arrays, ArrayList, HashMap, and HashSet. If you want to use another type in your tuples, you'll need to register a custom serializer.
 
 ### Dynamic typing
 
@@ -25,7 +25,7 @@ Finally, another reason for using dynamic typing is so Storm can be used in a st
 
 As mentioned, Storm uses Kryo for serialization. To implement custom serializers, you need to register new serializers with Kryo. It's highly recommended that you read over [Kryo's home page](https://github.com/EsotericSoftware/kryo) to understand how it handles custom serialization.
 
-Adding custom serializers is done through the "topology.kryo.register" property in your topology config. It takes a list of registrations, where each registration can take one of two forms:
+Adding custom serializers is done through the "topology.kryo.register" property in your topology config or through a ServiceLoader described later. The config takes a list of registrations, where each registration can take one of two forms:
 
 1. The name of a class to register. In this case, Storm will use Kryo's `FieldsSerializer` to serialize the class. This may or may not be optimal for the class -- see the Kryo docs for more details.
 2. A map from the name of a class to register to an implementation of [com.esotericsoftware.kryo.Serializer](https://github.com/EsotericSoftware/kryo/blob/master/src/com/esotericsoftware/kryo/Serializer.java).
@@ -45,6 +45,12 @@ Storm provides helpers for registering serializers in a topology config. The [Co
 
 There's an advanced config called `Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS`. If you set this to true, Storm will ignore any serializations that are registered but do not have their code available on the classpath. Otherwise, Storm will throw errors when it can't find a serialization. This is useful if you run many topologies on a cluster that each have different serializations, but you want to declare all the serializations across all topologies in the `storm.yaml` files.
 
+#### SerializationRegister Service Loader
+
+If you want to provide language bindings to storm, have a library that you want to interact cleanly with storm or have some other reason to provide serialization bindings and don't want to force the user to update their configs you can use the org.apache.storm.serialization.SerializationRegister service loader.
+
+You may use this like any other service loader and storm will register the bindings without forceing users to update their configs. The storm-clojure package uses this to provide transparent support for clojure types.
+
 ### Java serialization
 
 If Storm encounters a type for which it doesn't have a serialization registered, it will use Java serialization if possible. If the object can't be serialized with Java serialization, then Storm will throw an error.

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj b/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj
index 164e7ac..745678c 100644
--- a/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj
+++ b/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj
@@ -20,10 +20,12 @@
             [org.apache.storm.starter.clj.bolts :refer
              [rolling-count-bolt intermediate-rankings-bolt total-rankings-bolt]]
             [org.apache.storm [testing :refer :all]])
-  (:import [org.apache.storm Constants]
+  (:import [org.apache.storm Constants Testing]
+           [org.apache.storm.testing MkTupleParam]
            [org.apache.storm.task OutputCollector IOutputCollector]
            [org.apache.storm.starter.tools Rankable]
-           [org.apache.storm.tuple Tuple]))
+           [org.apache.storm.tuple Tuple]
+           [java.util ArrayList]))
 
 (defn execute-tuples [bolt tuples]
   (let [out (atom [])]
@@ -40,21 +42,11 @@
 
 (defn- mock-tuple [m & {component :component stream-id :stream-id
                         :or {component "1" stream-id "1"}}]
-  (reify
-    Tuple
-    (getSourceComponent [_]
-      component)
-    (getSourceStreamId [_]
-      stream-id)
-    (getString [this i]
-      (nth (vals m) 0))
-    (getValues [_]
-      (vals m))
-    clojure.lang.IPersistentMap
-    (valAt [_ key]
-      (get m key))
-    (seq [_]
-      (seq m))))
+  (let [param (MkTupleParam.)]
+    (.setStream param stream-id)
+    (.setComponent param component)
+    (.setFieldsList param (ArrayList. (.keySet m)))
+    (Testing/testTuple (ArrayList. (.values m)) param)))
 
 (def ^{:private true} tick-tuple
   (mock-tuple {}

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/pom.xml
----------------------------------------------------------------------
diff --git a/storm-client/pom.xml b/storm-client/pom.xml
index 9813593..ba27376 100644
--- a/storm-client/pom.xml
+++ b/storm-client/pom.xml
@@ -124,17 +124,6 @@
             <artifactId>snakeyaml</artifactId>
         </dependency>
 
-        <!-- clojure -->
-        <dependency>
-            <groupId>org.clojure</groupId>
-            <artifactId>clojure</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.twitter</groupId>
-            <artifactId>carbonite</artifactId>
-            <scope>compile</scope>
-        </dependency>
-
         <!-- netty -->
         <dependency>
             <groupId>io.netty</groupId>
@@ -333,4 +322,4 @@
         </plugins>
     </build>
 
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/clojure/ClojureBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/clojure/ClojureBolt.java b/storm-client/src/jvm/org/apache/storm/clojure/ClojureBolt.java
deleted file mode 100644
index 60300e2..0000000
--- a/storm-client/src/jvm/org/apache/storm/clojure/ClojureBolt.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.clojure;
-
-import org.apache.storm.coordination.CoordinatedBolt.FinishedCallback;
-import org.apache.storm.generated.StreamInfo;
-import org.apache.storm.task.IBolt;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichBolt;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import clojure.lang.IFn;
-import clojure.lang.PersistentArrayMap;
-import clojure.lang.Keyword;
-import clojure.lang.Symbol;
-import clojure.lang.RT;
-import org.apache.storm.utils.Utils;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-
-public class ClojureBolt implements IRichBolt, FinishedCallback {
-    Map<String, StreamInfo> _fields;
-    List<String> _fnSpec;
-    List<String> _confSpec;
-    List<Object> _params;
-    
-    IBolt _bolt;
-    
-    public ClojureBolt(List fnSpec, List confSpec, List<Object> params, Map<String, StreamInfo> fields) {
-        _fnSpec = fnSpec;
-        _confSpec = confSpec;
-        _params = params;
-        _fields = fields;
-    }
-
-    @Override
-    public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) {
-        IFn hof = Utils.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
-        try {
-            IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
-            final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] {
-                Keyword.intern(Symbol.create("output-collector")), collector,
-                Keyword.intern(Symbol.create("context")), context});
-            List<Object> args = new ArrayList<Object>() {{
-                add(stormConf);
-                add(context);
-                add(collectorMap);
-            }};
-            
-            _bolt = (IBolt) preparer.applyTo(RT.seq(args));
-            //this is kind of unnecessary for clojure
-            try {
-                _bolt.prepare(stormConf, context, collector);
-            } catch(AbstractMethodError ame) {
-                
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        _bolt.execute(input);
-    }
-
-    @Override
-    public void cleanup() {
-            try {
-                _bolt.cleanup();
-            } catch(AbstractMethodError ame) {
-                
-            }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for(String stream: _fields.keySet()) {
-            StreamInfo info = _fields.get(stream);
-            declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields()));
-        }
-    }
-
-    @Override
-    public void finishedId(Object id) {
-        if(_bolt instanceof FinishedCallback) {
-            ((FinishedCallback) _bolt).finishedId(id);
-        }
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        IFn hof = Utils.loadClojureFn(_confSpec.get(0), _confSpec.get(1));
-        try {
-            return (Map) hof.applyTo(RT.seq(_params));
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/clojure/ClojureSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/clojure/ClojureSpout.java b/storm-client/src/jvm/org/apache/storm/clojure/ClojureSpout.java
deleted file mode 100644
index 372b306..0000000
--- a/storm-client/src/jvm/org/apache/storm/clojure/ClojureSpout.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.clojure;
-
-import org.apache.storm.generated.StreamInfo;
-import org.apache.storm.spout.ISpout;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichSpout;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
-import clojure.lang.IFn;
-import clojure.lang.PersistentArrayMap;
-import clojure.lang.Keyword;
-import clojure.lang.Symbol;
-import clojure.lang.RT;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class ClojureSpout implements IRichSpout {
-    Map<String, StreamInfo> _fields;
-    List<String> _fnSpec;
-    List<String> _confSpec;
-    List<Object> _params;
-    
-    ISpout _spout;
-    
-    public ClojureSpout(List fnSpec, List confSpec, List<Object> params, Map<String, StreamInfo> fields) {
-        _fnSpec = fnSpec;
-        _confSpec = confSpec;
-        _params = params;
-        _fields = fields;
-    }
-    
-
-    @Override
-    public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
-        IFn hof = Utils.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
-        try {
-            IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
-            final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] {
-                Keyword.intern(Symbol.create("output-collector")), collector,
-                Keyword.intern(Symbol.create("context")), context});
-            List<Object> args = new ArrayList<Object>() {{
-                add(conf);
-                add(context);
-                add(collectorMap);
-            }};
-            
-            _spout = (ISpout) preparer.applyTo(RT.seq(args));
-            //this is kind of unnecessary for clojure
-            try {
-                _spout.open(conf, context, collector);
-            } catch(AbstractMethodError ame) {
-                
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void close() {
-        try {
-            _spout.close();
-        } catch(AbstractMethodError ame) {
-                
-        }
-    }
-
-    @Override
-    public void nextTuple() {
-        try {
-            _spout.nextTuple();
-        } catch(AbstractMethodError ame) {
-                
-        }
-
-    }
-
-    @Override
-    public void ack(Object msgId) {
-        try {
-            _spout.ack(msgId);
-        } catch(AbstractMethodError ame) {
-                
-        }
-
-    }
-
-    @Override
-    public void fail(Object msgId) {
-        try {
-            _spout.fail(msgId);
-        } catch(AbstractMethodError ame) {
-                
-        }
-
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for(String stream: _fields.keySet()) {
-            StreamInfo info = _fields.get(stream);
-            declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields()));
-        }
-    }
-    
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        IFn hof = Utils.loadClojureFn(_confSpec.get(0), _confSpec.get(1));
-        try {
-            return (Map) hof.applyTo(RT.seq(_params));
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void activate() {
-        try {
-            _spout.activate();
-        } catch(AbstractMethodError ame) {
-                
-        }
-    }
-
-    @Override
-    public void deactivate() {
-        try {
-            _spout.deactivate();
-        } catch(AbstractMethodError ame) {
-                
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/clojure/RichShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/clojure/RichShellBolt.java b/storm-client/src/jvm/org/apache/storm/clojure/RichShellBolt.java
deleted file mode 100644
index 6de5637..0000000
--- a/storm-client/src/jvm/org/apache/storm/clojure/RichShellBolt.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.clojure;
-
-import org.apache.storm.generated.StreamInfo;
-import org.apache.storm.task.ShellBolt;
-import org.apache.storm.topology.IRichBolt;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import java.util.Map;
-
-public class RichShellBolt extends ShellBolt implements IRichBolt {
-    private Map<String, StreamInfo> _outputs;
-    
-    public RichShellBolt(String[] command, Map<String, StreamInfo> outputs) {
-        super(command);
-        _outputs = outputs;
-    }
-    
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for(String stream: _outputs.keySet()) {
-            StreamInfo def = _outputs.get(stream);
-            if(def.is_direct()) {
-                declarer.declareStream(stream, true, new Fields(def.get_output_fields()));
-            } else {
-                declarer.declareStream(stream, new Fields(def.get_output_fields()));                
-            }
-        }
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
-    }    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/clojure/RichShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/clojure/RichShellSpout.java b/storm-client/src/jvm/org/apache/storm/clojure/RichShellSpout.java
deleted file mode 100644
index 9fb7e73..0000000
--- a/storm-client/src/jvm/org/apache/storm/clojure/RichShellSpout.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.clojure;
-
-import org.apache.storm.generated.StreamInfo;
-import org.apache.storm.spout.ShellSpout;
-import org.apache.storm.topology.IRichSpout;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import java.util.Map;
-
-public class RichShellSpout extends ShellSpout implements IRichSpout {
-    private Map<String, StreamInfo> _outputs;
-
-    public RichShellSpout(String[] command, Map<String, StreamInfo> outputs) {
-        super(command);
-        _outputs = outputs;
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for(String stream: _outputs.keySet()) {
-            StreamInfo def = _outputs.get(stream);
-            if(def.is_direct()) {
-                declarer.declareStream(stream, true, new Fields(def.get_output_fields()));
-            } else {
-                declarer.declareStream(stream, new Fields(def.get_output_fields()));
-            }
-        }
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java b/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java
index aa731ff..a004d25 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java
@@ -18,7 +18,6 @@
 package org.apache.storm.cluster;
 
 import java.util.List;
-import java.util.Map;
 
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.storm.callback.ZKStateChangedCallback;

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 51caad9..1655323 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -503,12 +503,6 @@ public class StormClusterStateImpl implements IStormClusterState {
         stateStorage.set_data(path, Utils.serialize(stormBase), acls);
     }
 
-    /**
-     * To update this function due to APersistentMap/APersistentSet is clojure's structure
-     * 
-     * @param stormId
-     * @param newElems
-     */
     @Override
     public void updateStorm(String stormId, StormBase newElems) {
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
index 97b16d8..c3a18ad 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -115,7 +115,7 @@ public class BoltExecutor extends Executor {
             IBolt boltObject = (IBolt) idToTask.get(taskId).getTaskObject();
             boolean isSampled = sampler.call();
             boolean isExecuteSampler = executeSampler.call();
-            Long now = (isSampled || isExecuteSampler) ? System.currentTimeMillis() : null;
+            Long now = (isSampled || isExecuteSampler) ? Time.currentTimeMillis() : null;
             if (isSampled) {
                 tuple.setProcessSampleStartTime(now);
             }
@@ -130,7 +130,7 @@ public class BoltExecutor extends Executor {
                 LOG.info("Execute done TUPLE {} TASK: {} DELTA: {}", tuple, taskId, delta);
             }
             new BoltExecuteInfo(tuple, taskId, delta).applyOn(idToTask.get(taskId).getUserContext());
-            if (delta != 0) {
+            if (delta >= 0) {
                 ((BoltExecutorStats) stats).boltExecuteTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), delta);
             }
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
index c490c3d..46b945b 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
@@ -115,7 +115,7 @@ public class BoltOutputCollectorImpl implements IOutputCollector {
         }
         BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta);
         boltAckInfo.applyOn(taskData.getUserContext());
-        if (delta != 0) {
+        if (delta >= 0) {
             ((BoltExecutorStats) executor.getStats()).boltAckedTuple(
                     input.getSourceComponent(), input.getSourceStreamId(), delta);
         }
@@ -134,7 +134,7 @@ public class BoltOutputCollectorImpl implements IOutputCollector {
         }
         BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta);
         boltFailInfo.applyOn(taskData.getUserContext());
-        if (delta != 0) {
+        if (delta >= 0) {
             ((BoltExecutorStats) executor.getStats()).boltFailedTuple(
                     input.getSourceComponent(), input.getSourceStreamId(), delta);
         }
@@ -156,9 +156,10 @@ public class BoltOutputCollectorImpl implements IOutputCollector {
 
     private long tupleTimeDelta(TupleImpl tuple) {
         Long ms = tuple.getProcessSampleStartTime();
-        if (ms != null)
+        if (ms != null) {
             return Time.deltaMs(ms);
-        return 0;
+        }
+        return -1;
     }
 
     private void putXor(Map<Long, Long> pending, Long key, Long id) {

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java b/storm-client/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java
index e7ca10d..f2623be 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java
@@ -39,7 +39,13 @@ public interface IMetricsConsumer {
         public String srcComponentId; 
         public int srcTaskId; 
         public long timestamp;
-        public int updateIntervalSecs; 
+        public int updateIntervalSecs;
+
+        @Override
+        public String toString() {
+            return "TASK_INFO: { host: " + srcWorkerHost + ":" + srcWorkerPort +
+                    " comp: " + srcComponentId + "["+ srcTaskId + "]}";
+        }
     }
 
     // We can't move this to outside without breaking backward compatibility.

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java b/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java
index e3c15ac..fbe4cfa 100644
--- a/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java
@@ -29,7 +29,6 @@ import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.ListDelegate;
 import org.apache.storm.utils.ReflectionUtils;
-import carbonite.JavaBridge;
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.serializers.DefaultSerializers.BigIntegerSerializer;
@@ -40,12 +39,14 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.ServiceLoader;
 import java.util.TreeMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SerializationFactory {
     public static final Logger LOG = LoggerFactory.getLogger(SerializationFactory.class);
+    public static final ServiceLoader<SerializationRegister> loader = ServiceLoader.load(SerializationRegister.class);
 
     public static Kryo getKryo(Map conf) {
         IKryoFactory kryoFactory = (IKryoFactory) ReflectionUtils.newInstance((String) conf.get(Config.TOPOLOGY_KRYO_FACTORY));
@@ -71,11 +72,15 @@ public class SerializationFactory {
         k.register(org.apache.storm.metric.api.IMetricsConsumer.DataPoint.class);
         k.register(org.apache.storm.metric.api.IMetricsConsumer.TaskInfo.class);
         k.register(ConsList.class);
-        try {
-            JavaBridge.registerPrimitives(k);
-            JavaBridge.registerCollections(k);
-        } catch(Exception e) {
-            throw new RuntimeException(e);
+        
+        synchronized (loader) {
+            for (SerializationRegister sr: loader) {
+                try {
+                    sr.register(k);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
         }
 
         Map<String, String> registrations = normalizeKryoRegister(conf);

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/serialization/SerializationRegister.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/serialization/SerializationRegister.java b/storm-client/src/jvm/org/apache/storm/serialization/SerializationRegister.java
new file mode 100644
index 0000000..3834c33
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/serialization/SerializationRegister.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+
+/**
+ * Provides a way using a service loader to register Kryo
+ * serializers with the SerializationFactory without needing
+ * to modify the config.  This allows for language bindings
+ * libraries or platforms to include their own registration
+ * without impacting a clients config.
+ */
+public interface SerializationRegister {
+    /**
+     * Register any serializers needed with the kryo instance
+     * @param kryo what to register the serializers with.
+     */
+    public void register(Kryo kryo) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/spout/ShellSpout.java b/storm-client/src/jvm/org/apache/storm/spout/ShellSpout.java
index a5ec72b..72b7f17 100644
--- a/storm-client/src/jvm/org/apache/storm/spout/ShellSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/spout/ShellSpout.java
@@ -24,6 +24,7 @@ import org.apache.storm.metric.api.rpc.IShellMetric;
 import org.apache.storm.multilang.ShellMsg;
 import org.apache.storm.multilang.SpoutMsg;
 import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ShellProcess;
 
 import java.util.Arrays;
@@ -37,7 +38,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-import clojure.lang.RT;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -98,9 +98,9 @@ public class ShellSpout implements ISpout {
         _context = context;
 
         if (stormConf.containsKey(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS)) {
-            workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS));
+            workerTimeoutMills = 1000 * ObjectReader.getInt(stormConf.get(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS));
         } else {
-            workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
+            workerTimeoutMills = 1000 * ObjectReader.getInt(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
         }
 
         _process = new ShellProcess(_command);

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java b/storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java
index 0ab48f0..6420546 100644
--- a/storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java
+++ b/storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java
@@ -641,12 +641,11 @@ public class StatsUtil {
 
     /**
      * aggregate topo executors stats
-     * TODO: change clojure maps to java HashMap's when nimbus.clj is translated to java
      *
      * @param topologyId     topology id
-     * @param exec2nodePort  executor -> host+port, note it's a clojure map
-     * @param task2component task -> component, note it's a clojure map
-     * @param beats          executor[start, end] -> executor heartbeat, note it's a java HashMap
+     * @param exec2nodePort  executor -> host+port
+     * @param task2component task -> component
+     * @param beats          executor[start, end] -> executor heartbeat
      * @param topology       storm topology
      * @param window         the window to be aggregated
      * @param includeSys     whether to include system streams
@@ -1232,8 +1231,8 @@ public class StatsUtil {
     /**
      * aggregate component executor stats
      *
-     * @param exec2hostPort  a Map of {executor -> host+port}, note it's a clojure map
-     * @param task2component a Map of {task id -> component}, note it's a clojure map
+     * @param exec2hostPort  a Map of {executor -> host+port}
+     * @param task2component a Map of {task id -> component}
      * @param beats          a converted HashMap of executor heartbeats, {executor -> heartbeat}
      * @param window         specified window
      * @param includeSys     whether to include system streams
@@ -1258,9 +1257,9 @@ public class StatsUtil {
      *
      * @param topologyId       topology id
      * @param topology         storm topology
-     * @param task2component   a Map of {task id -> component}, note it's a clojure map
+     * @param task2component   a Map of {task id -> component}
      * @param beats            a converted HashMap of executor heartbeats, {executor -> heartbeat}
-     * @param exec2hostPort    a Map of {executor -> host+port}, note it's a clojure map
+     * @param exec2hostPort    a Map of {executor -> host+port}
      * @param includeSys       whether to include system streams
      * @param userAuthorized   whether the user is authorized to view topology info
      * @param filterSupervisor if not null, only return WorkerSummaries for that supervisor
@@ -1363,9 +1362,9 @@ public class StatsUtil {
      *
      * @param topologyId       topology id
      * @param topology         storm topology
-     * @param task2component   a Map of {task id -> component}, note it's a clojure map
+     * @param task2component   a Map of {task id -> component}
      * @param beats            a converted HashMap of executor heartbeats, {executor -> heartbeat}
-     * @param exec2hostPort    a Map of {executor -> host+port}, note it's a clojure map
+     * @param exec2hostPort    a Map of {executor -> host+port}
      * @param includeSys       whether to include system streams
      * @param userAuthorized   whether the user is authorized to view topology info
      *
@@ -1499,8 +1498,8 @@ public class StatsUtil {
     /**
      * extract a list of host port info for specified component
      *
-     * @param exec2hostPort  {executor -> host+port}, note it's a clojure map
-     * @param task2component {task id -> component}, note it's a clojure map
+     * @param exec2hostPort  {executor -> host+port}
+     * @param task2component {task id -> component}
      * @param includeSys     whether to include system streams
      * @param compId         component id
      * @return a list of host+port
@@ -1690,24 +1689,11 @@ public class StatsUtil {
         return ret;
     }
 
-
-    /**
-     * convert a list of clojure executors to a java Set of List<Integer>
-     */
-    public static Set<List<Integer>> convertExecutors(Set executors) {
-        Set<List<Integer>> convertedExecutors = new HashSet<>();
-        for (Object executor : executors) {
-            List l = (List) executor;
-            convertedExecutors.add(convertExecutor(l));
-        }
-        return convertedExecutors;
-    }
-
     /**
-     * convert a clojure executor to java List<Integer>
+     * convert a List<Long> executor to java List<Integer>
      */
-    public static List<Integer> convertExecutor(List executor) {
-        return Lists.newArrayList(((Number) executor.get(0)).intValue(), ((Number) executor.get(1)).intValue());
+    public static List<Integer> convertExecutor(List<Long> executor) {
+        return Lists.newArrayList(executor.get(0).intValue(), executor.get(1).intValue());
     }
 
     /**
@@ -2249,14 +2235,12 @@ public class StatsUtil {
     }
 
     /**
-     * convert clojure structure to java maps
+     * convert Long Executor Ids in ZkHbs to Integer ones structure to java maps
      */
-    public static Map<List<Integer>, ExecutorStats> convertExecutorZkHbs(Map executorBeats) {
+    public static Map<List<Integer>, ExecutorStats> convertExecutorZkHbs(Map<List<Long>, ExecutorStats> executorBeats) {
         Map<List<Integer>, ExecutorStats> ret = new HashMap<>();
-        for (Object executorBeat : executorBeats.entrySet()) {
-            Map.Entry entry = (Map.Entry) executorBeat;
-            List startEnd = (List) entry.getKey();
-            ret.put(convertExecutor(startEnd), (ExecutorStats) entry.getValue());
+        for (Map.Entry<List<Long>, ExecutorStats> entry : executorBeats.entrySet()) {
+            ret.put(convertExecutor(entry.getKey()), entry.getValue());
         }
         return ret;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/task/ShellBolt.java b/storm-client/src/jvm/org/apache/storm/task/ShellBolt.java
index 6acb19b..0acb409 100644
--- a/storm-client/src/jvm/org/apache/storm/task/ShellBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/task/ShellBolt.java
@@ -27,9 +27,9 @@ import org.apache.storm.multilang.ShellMsg;
 import org.apache.storm.topology.ReportedFailedException;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ShellBoltMessageQueue;
 import org.apache.storm.utils.ShellProcess;
-import clojure.lang.RT;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -140,9 +140,9 @@ public class ShellBolt implements IBolt {
         _context = context;
 
         if (stormConf.containsKey(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS)) {
-            workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS));
+            workerTimeoutMills = 1000 * ObjectReader.getInt(stormConf.get(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS));
         } else {
-            workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
+            workerTimeoutMills = 1000 * ObjectReader.getInt(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
         }
 
         _process = new ShellProcess(_command);

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/testing/KeyedSummingBatchBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/KeyedSummingBatchBolt.java b/storm-client/src/jvm/org/apache/storm/testing/KeyedSummingBatchBolt.java
index 598a54c..b508bf1 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/KeyedSummingBatchBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/KeyedSummingBatchBolt.java
@@ -25,7 +25,6 @@ import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.Utils;
-import clojure.lang.Numbers;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -44,7 +43,7 @@ public class KeyedSummingBatchBolt extends BaseBatchBolt {
     public void execute(Tuple tuple) {
         Object key = tuple.getValue(1);
         Number curr = Utils.get(_sums, key, 0);
-        _sums.put(key, Numbers.add(curr, tuple.getValue(2)));
+        _sums.put(key, curr.longValue()  + ((Number)tuple.getValue(2)).longValue());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/trident/operation/builtin/Sum.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/trident/operation/builtin/Sum.java b/storm-client/src/jvm/org/apache/storm/trident/operation/builtin/Sum.java
index 003d780..27f41bf 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/operation/builtin/Sum.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/operation/builtin/Sum.java
@@ -17,7 +17,8 @@
  */
 package org.apache.storm.trident.operation.builtin;
 
-import clojure.lang.Numbers;
+import java.math.BigDecimal;
+
 import org.apache.storm.trident.operation.CombinerAggregator;
 import org.apache.storm.trident.tuple.TridentTuple;
 
@@ -31,12 +32,29 @@ public class Sum implements CombinerAggregator<Number> {
 
     @Override
     public Number combine(Number val1, Number val2) {
-        return Numbers.add(val1, val2);
+        if (val1 instanceof BigDecimal || val2 instanceof BigDecimal) {
+            BigDecimal v1 = asBigDecimal(val1);
+            BigDecimal v2 = asBigDecimal(val2);
+            return (v1).add(v2);
+        }
+        if (val1 instanceof Double || val2 instanceof Double) {
+            return val1.doubleValue() + val2.doubleValue();
+        }
+        return val1.longValue() + val2.longValue();
+    }
+
+    private static BigDecimal asBigDecimal(Number val) {
+        BigDecimal ret;
+        if (val instanceof BigDecimal) {
+            ret = (BigDecimal) val;
+        } else {
+            ret = new BigDecimal(val.doubleValue());
+        }
+        return ret;
     }
 
     @Override
     public Number zero() {
         return 0;
     }
-    
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/trident/tuple/TridentTupleView.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/trident/tuple/TridentTupleView.java b/storm-client/src/jvm/org/apache/storm/trident/tuple/TridentTupleView.java
index 02f28c6..df85d80 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/tuple/TridentTupleView.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/tuple/TridentTupleView.java
@@ -19,9 +19,6 @@ package org.apache.storm.trident.tuple;
 
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
-import clojure.lang.IPersistentVector;
-import clojure.lang.PersistentVector;
-import clojure.lang.RT;
 import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -35,9 +32,9 @@ import java.util.Arrays;
  * Extends AbstractList so that it can be emitted directly as Storm tuples
  */
 public class TridentTupleView extends AbstractList<Object> implements TridentTuple {
-    ValuePointer[] _index;
-    Map<String, ValuePointer> _fieldIndex;
-    IPersistentVector _delegates;
+    private final ValuePointer[] _index;
+    private final Map<String, ValuePointer> _fieldIndex;
+    private final List<List<Object>> _delegates;
 
     public static class ProjectionFactory implements Factory {
         Map<String, ValuePointer> _fieldIndex;
@@ -90,7 +87,7 @@ public class TridentTupleView extends AbstractList<Object> implements TridentTup
         }
         
         public TridentTuple create(List<Object> selfVals) {
-            return new TridentTupleView(PersistentVector.EMPTY.cons(selfVals), _index, _fieldIndex);
+            return new TridentTupleView(Arrays.asList(selfVals), _index, _fieldIndex);
         }
 
         @Override
@@ -138,8 +135,8 @@ public class TridentTupleView extends AbstractList<Object> implements TridentTup
         }
         
         public TridentTuple create(TridentTupleView parent, List<Object> selfVals) {
-            IPersistentVector curr = parent._delegates;
-            curr = (IPersistentVector) RT.conj(curr, selfVals);
+            List<List<Object>> curr = new ArrayList<>(parent._delegates);
+            curr.add(selfVals);
             return new TridentTupleView(curr, _index, _fieldIndex);
         }
 
@@ -174,7 +171,7 @@ public class TridentTupleView extends AbstractList<Object> implements TridentTup
         }
         
         public TridentTuple create(Tuple parent) {            
-            return new TridentTupleView(PersistentVector.EMPTY.cons(parent.getValues()), index, fieldIndex);
+            return new TridentTupleView(Arrays.asList(parent.getValues()), index, fieldIndex);
         }
 
         @Override
@@ -204,7 +201,7 @@ public class TridentTupleView extends AbstractList<Object> implements TridentTup
     public static final TridentTupleView EMPTY_TUPLE = new TridentTupleView(null, new ValuePointer[0], new HashMap());
 
     // index and fieldIndex are precomputed, delegates built up over many operations using persistent data structures
-    public TridentTupleView(IPersistentVector delegates, ValuePointer[] index, Map<String, ValuePointer> fieldIndex) {
+    public TridentTupleView(List delegates, ValuePointer[] index, Map<String, ValuePointer> fieldIndex) {
         _delegates = delegates;
         _index = index;
         _fieldIndex = fieldIndex;
@@ -356,6 +353,6 @@ public class TridentTupleView extends AbstractList<Object> implements TridentTup
     }
 
     private Object getValueByPointer(ValuePointer ptr) {
-        return ((List<Object>)_delegates.nth(ptr.delegateIndex)).get(ptr.index);     
+        return _delegates.get(ptr.delegateIndex).get(ptr.index);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/tuple/Tuple.java b/storm-client/src/jvm/org/apache/storm/tuple/Tuple.java
index 88697ef..bf727e0 100644
--- a/storm-client/src/jvm/org/apache/storm/tuple/Tuple.java
+++ b/storm-client/src/jvm/org/apache/storm/tuple/Tuple.java
@@ -18,6 +18,7 @@
 package org.apache.storm.tuple;
 
 import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.task.GeneralTopologyContext;
 
 /**
  * The tuple is the main data structure in Storm. A tuple is a named list of values, 
@@ -65,4 +66,9 @@ public interface Tuple extends ITuple{
      * Gets the message id that associated with this tuple.
      */
     public MessageId getMessageId();
+
+    /**
+     * Gets the topology context associated with the tuple
+     */
+    public GeneralTopologyContext getContext();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java b/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java
index c793dfd..9356c4b 100644
--- a/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java
@@ -17,33 +17,38 @@
  */
 package org.apache.storm.tuple;
 
+import java.util.Collections;
+import java.util.List;
+
 import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.task.GeneralTopologyContext;
-import org.apache.storm.utils.IndifferentAccessMap;
-import clojure.lang.ASeq;
-import clojure.lang.Counted;
-import clojure.lang.IMeta;
-import clojure.lang.IPersistentMap;
-import clojure.lang.ISeq;
-import clojure.lang.Indexed;
-import clojure.lang.Keyword;
-import clojure.lang.MapEntry;
-import clojure.lang.Obj;
-import clojure.lang.PersistentArrayMap;
-import clojure.lang.Seqable;
-import clojure.lang.Symbol;
-import java.util.List;
 
-public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed, IMeta, Tuple {
-    private List<Object> values;
-    private int taskId;
-    private String streamId;
-    private GeneralTopologyContext context;
-    private MessageId id;
-    private IPersistentMap _meta;
+public class TupleImpl implements Tuple {
+    private final List<Object> values;
+    private final int taskId;
+    private final String streamId;
+    private final GeneralTopologyContext context;
+    private final MessageId id;
+    private Long _processSampleStartTime;
+    private Long _executeSampleStartTime;
+    private long _outAckVal = 0;
     
+    public TupleImpl(Tuple t) {
+        this.values = t.getValues();
+        this.taskId = t.getSourceTask();
+        this.streamId = t.getSourceStreamId();
+        this.id = t.getMessageId();
+        this.context = t.getContext();
+        if (t instanceof TupleImpl) {
+            TupleImpl ti = (TupleImpl) t;
+            this._processSampleStartTime = ti._processSampleStartTime;
+            this._executeSampleStartTime = ti._executeSampleStartTime;
+            this._outAckVal = ti._outAckVal;
+        }
+    }
+
     public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) {
-        this.values = values;
+        this.values = Collections.unmodifiableList(values);
         this.taskId = taskId;
         this.streamId = streamId;
         this.id = id;
@@ -61,10 +66,7 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
 
     public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId) {
         this(context, values, taskId, streamId, MessageId.makeUnanchored());
-    }    
-    
-    Long _processSampleStartTime;
-    Long _executeSampleStartTime;
+    }
     
     public void setProcessSampleStartTime(long ms) {
         _processSampleStartTime = ms;
@@ -82,8 +84,6 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
         return _executeSampleStartTime;
     }
     
-    long _outAckVal = 0;
-    
     public void updateAckVal(long val) {
         _outAckVal = _outAckVal ^ val;
     }
@@ -91,140 +91,176 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
     public long getAckVal() {
         return _outAckVal;
     }
-
+    
+    /** Tuple APIs*/
+    @Override
     public int size() {
         return values.size();
     }
-    
+
+    @Override
     public int fieldIndex(String field) {
         return getFields().fieldIndex(field);
     }
     
+    @Override
     public boolean contains(String field) {
         return getFields().contains(field);
     }
     
+    @Override
     public Object getValue(int i) {
         return values.get(i);
     }
 
+    @Override
     public String getString(int i) {
         return (String) values.get(i);
     }
 
+    @Override
     public Integer getInteger(int i) {
         return (Integer) values.get(i);
     }
 
+    @Override
     public Long getLong(int i) {
         return (Long) values.get(i);
     }
 
+    @Override
     public Boolean getBoolean(int i) {
         return (Boolean) values.get(i);
     }
 
+    @Override
     public Short getShort(int i) {
         return (Short) values.get(i);
     }
 
+    @Override
     public Byte getByte(int i) {
         return (Byte) values.get(i);
     }
 
+    @Override
     public Double getDouble(int i) {
         return (Double) values.get(i);
     }
 
+    @Override
     public Float getFloat(int i) {
         return (Float) values.get(i);
     }
 
+    @Override
     public byte[] getBinary(int i) {
         return (byte[]) values.get(i);
     }
-    
-    
+
+    @Override
     public Object getValueByField(String field) {
         return values.get(fieldIndex(field));
     }
 
+    @Override
     public String getStringByField(String field) {
         return (String) values.get(fieldIndex(field));
     }
 
+    @Override
     public Integer getIntegerByField(String field) {
         return (Integer) values.get(fieldIndex(field));
     }
 
+    @Override
     public Long getLongByField(String field) {
         return (Long) values.get(fieldIndex(field));
     }
 
+    @Override
     public Boolean getBooleanByField(String field) {
         return (Boolean) values.get(fieldIndex(field));
     }
 
+    @Override
     public Short getShortByField(String field) {
         return (Short) values.get(fieldIndex(field));
     }
 
+    @Override
     public Byte getByteByField(String field) {
         return (Byte) values.get(fieldIndex(field));
     }
 
+    @Override
     public Double getDoubleByField(String field) {
         return (Double) values.get(fieldIndex(field));
     }
 
+    @Override
     public Float getFloatByField(String field) {
         return (Float) values.get(fieldIndex(field));
     }
 
+    @Override
     public byte[] getBinaryByField(String field) {
         return (byte[]) values.get(fieldIndex(field));
     }
-    
+
+    @Override
     public List<Object> getValues() {
         return values;
     }
-    
+
+    @Override    
     public Fields getFields() {
         return context.getComponentOutputFields(getSourceComponent(), getSourceStreamId());
     }
 
+    @Override
     public List<Object> select(Fields selector) {
         return getFields().select(selector, values);
     }
     
-    @Deprecated
+    @Override
     public GlobalStreamId getSourceGlobalStreamid() {
         return getSourceGlobalStreamId();
     }
-    
+
+    @Override
     public GlobalStreamId getSourceGlobalStreamId() {
         return new GlobalStreamId(getSourceComponent(), streamId);
     }
-    
+
+    @Override
     public String getSourceComponent() {
         return context.getComponentId(taskId);
     }
-    
+
+    @Override
     public int getSourceTask() {
         return taskId;
     }
-    
+
+    @Override
     public String getSourceStreamId() {
         return streamId;
     }
-    
+
+    @Override
     public MessageId getMessageId() {
         return id;
     }
     
     @Override
+    public GeneralTopologyContext getContext() {
+        return context;
+    }
+    
+    @Override
     public String toString() {
-        return "source: " + getSourceComponent() + ":" + taskId + ", stream: " + streamId + ", id: "+ id.toString() + ", " + values.toString();
+        return "source: " + getSourceComponent() + ":" + taskId + ", stream: " + streamId + ", id: "+ id.toString() + ", " + values.toString() + " PROC_START_TIME(sampled): " + _processSampleStartTime + " EXEC_START_TIME(sampled): " + _executeSampleStartTime;
     }
     
     @Override
@@ -236,121 +272,4 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
     public int hashCode() {
         return System.identityHashCode(this);
     }
-
-    private Keyword makeKeyword(String name) {
-        return Keyword.intern(Symbol.create(name));
-    }    
-
-    /* ILookup */
-    @Override
-    public Object valAt(Object o) {
-        try {
-            if(o instanceof Keyword) {
-                return getValueByField(((Keyword) o).getName());
-            } else if(o instanceof String) {
-                return getValueByField((String) o);
-            }
-        } catch(IllegalArgumentException ignored) {
-        }
-        return null;
-    }
-
-    /* Seqable */
-    public ISeq seq() {
-        if(values.size() > 0) {
-            return new Seq(getFields().toList(), values, 0);
-        }
-        return null;
-    }
-
-    static class Seq extends ASeq implements Counted {
-        final List<String> fields;
-        final List<Object> values;
-        final int i;
-
-        Seq(List<String> fields, List<Object> values, int i) {
-            this.fields = fields;
-            this.values = values;
-            assert i >= 0;
-            this.i = i;
-        }
-
-        public Seq(IPersistentMap meta, List<String> fields, List<Object> values, int i) {
-            super(meta);
-            this.fields= fields;
-            this.values = values;
-            assert i >= 0;
-            this.i = i;
-        }
-
-        public Object first() {
-            return new MapEntry(fields.get(i), values.get(i));
-        }
-
-        public ISeq next() {
-            if(i+1 < fields.size()) {
-                return new Seq(fields, values, i+1);
-            }
-            return null;
-        }
-
-        public int count() {
-            assert fields.size() -i >= 0 : "index out of bounds";
-            // i being the position in the fields of this seq, the remainder of the seq is the size
-            return fields.size() -i;
-        }
-
-        public Obj withMeta(IPersistentMap meta) {
-            return new Seq(meta, fields, values, i);
-        }
-    }
-
-    /* Indexed */
-    public Object nth(int i) {
-        if(i < values.size()) {
-            return values.get(i);
-        } else {
-            return null;
-        }
-    }
-
-    public Object nth(int i, Object notfound) {
-        Object ret = nth(i);
-        if(ret==null) ret = notfound;
-        return ret;
-    }
-
-    /* Counted */
-    public int count() {
-        return values.size();
-    }
-    
-    /* IMeta */
-    public IPersistentMap meta() {
-        if(_meta==null) {
-            _meta = new PersistentArrayMap( new Object[] {
-            makeKeyword("stream"), getSourceStreamId(), 
-            makeKeyword("component"), getSourceComponent(), 
-            makeKeyword("task"), getSourceTask()});
-        }
-        return _meta;
-    }
-
-    private PersistentArrayMap toMap() {
-        Object array[] = new Object[values.size()*2];
-        List<String> fields = getFields().toList();
-        for(int i=0; i < values.size(); i++) {
-            array[i*2] = fields.get(i);
-            array[(i*2)+1] = values.get(i);
-        }
-        return new PersistentArrayMap(array);
-    }
-
-    public IPersistentMap getMap() {
-        if(_map==null) {
-            setMap(toMap());
-        }
-        return _map;
-    }    
-    
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/utils/IndifferentAccessMap.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/IndifferentAccessMap.java b/storm-client/src/jvm/org/apache/storm/utils/IndifferentAccessMap.java
deleted file mode 100644
index 2675ab7..0000000
--- a/storm-client/src/jvm/org/apache/storm/utils/IndifferentAccessMap.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.utils;
-
-
-import clojure.lang.ILookup;
-import clojure.lang.ISeq;
-import clojure.lang.AFn;
-import clojure.lang.IPersistentMap;
-import clojure.lang.PersistentArrayMap;
-import clojure.lang.IMapEntry;
-import clojure.lang.IPersistentCollection;
-import clojure.lang.Keyword;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Collection;
-import java.util.Set;
-
-public class IndifferentAccessMap extends AFn implements ILookup, IPersistentMap, Map {
-
-    protected IPersistentMap _map;
-
-    protected IndifferentAccessMap() {
-    }
-
-    public IndifferentAccessMap(IPersistentMap map) {
-        setMap(map);
-    }
-
-    public IPersistentMap getMap() {
-        return _map;
-    }
-
-    public IPersistentMap setMap(IPersistentMap map) {
-        _map = map;
-        return _map;
-    }
-
-    public int size() {
-        return ((Map) getMap()).size();
-    }
-
-    public int count() {
-        return size();
-    }
-
-    public ISeq seq() {
-        return getMap().seq();
-    }
-
-    @Override
-    public Object valAt(Object o) {
-        if(o instanceof Keyword) {
-            return valAt(((Keyword) o).getName());
-        }
-        return getMap().valAt(o);
-    }
-    
-    @Override
-    public Object valAt(Object o, Object def) {
-        Object ret = valAt(o);
-        if(ret==null) ret = def;
-        return ret;
-    }
-
-    /* IFn */
-    @Override
-    public Object invoke(Object o) {
-        return valAt(o);
-    }
-
-    @Override
-    public Object invoke(Object o, Object notfound) {
-        return valAt(o, notfound);
-    }
-
-    /* IPersistentMap */
-    /* Naive implementation, but it might be good enough */
-    public IPersistentMap assoc(Object k, Object v) {
-        if(k instanceof Keyword) return assoc(((Keyword) k).getName(), v);
-        
-        return new IndifferentAccessMap(getMap().assoc(k, v));
-    }
-
-    public IPersistentMap assocEx(Object k, Object v) {
-        if(k instanceof Keyword) return assocEx(((Keyword) k).getName(), v);
-
-        return new IndifferentAccessMap(getMap().assocEx(k, v));
-    }
-
-    public IPersistentMap without(Object k) {
-        if(k instanceof Keyword) return without(((Keyword) k).getName());
-
-        return new IndifferentAccessMap(getMap().without(k));
-    }
-
-    public boolean containsKey(Object k) {
-        if(k instanceof Keyword) return containsKey(((Keyword) k).getName());
-        return getMap().containsKey(k);
-    }
-
-    public IMapEntry entryAt(Object k) {
-        if(k instanceof Keyword) return entryAt(((Keyword) k).getName());
-
-        return getMap().entryAt(k);
-    }
-
-    public IPersistentCollection cons(Object o) {
-        return getMap().cons(o);
-    }
-
-    public IPersistentCollection empty() {
-        return new IndifferentAccessMap(PersistentArrayMap.EMPTY);
-    }
-
-    public boolean equiv(Object o) {
-        return getMap().equiv(o);
-    }
-
-    public Iterator iterator() {
-        return getMap().iterator();
-    }
-
-    /* Map */
-    public boolean containsValue(Object v) {
-        return ((Map) getMap()).containsValue(v);
-    }
-
-    public Set entrySet() {
-        return ((Map) getMap()).entrySet();
-    }
-
-    public Object get(Object k) {
-        return valAt(k);
-    }
-
-    public boolean isEmpty() {
-        return ((Map) getMap()).isEmpty();
-    }
-
-    public Set keySet() {
-        return ((Map) getMap()).keySet();
-    }
-
-    public Collection values() {
-        return ((Map) getMap()).values();
-    }
-    
-    /* Not implemented */
-    public void clear() {
-        throw new UnsupportedOperationException();
-    }
-    public Object put(Object k, Object v) {
-        throw new UnsupportedOperationException();
-    }
-    public void putAll(Map m) {
-        throw new UnsupportedOperationException();
-    }
-    public Object remove(Object k) {
-        throw new UnsupportedOperationException();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index 771dc70..effaba2 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -18,7 +18,6 @@
 
 package org.apache.storm.utils;
 
-import clojure.lang.RT;
 import org.apache.storm.Config;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -864,15 +863,6 @@ public class Utils {
         return findAndReadConfigFile(name, true);
     }
 
-    public static synchronized clojure.lang.IFn loadClojureFn(String namespace, String name) {
-        try {
-            clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")"));
-        } catch (Exception e) {
-            //if playing from the repl and defining functions, file won't exist
-        }
-        return (clojure.lang.IFn) RT.var(namespace, name).deref();
-    }
-
     /**
      * "[[:a 1] [:b 1] [:c 2]} -> {1 [:a :b] 2 :c}"
      * Reverses an assoc-list style Map like reverseMap(Map...)

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/pom.xml
----------------------------------------------------------------------
diff --git a/storm-clojure/pom.xml b/storm-clojure/pom.xml
index cef8fd0..f525d01 100644
--- a/storm-clojure/pom.xml
+++ b/storm-clojure/pom.xml
@@ -27,6 +27,10 @@
 
     <artifactId>storm-clojure</artifactId>
 
+    <properties>
+        <argLine></argLine>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
@@ -43,6 +47,17 @@
             <artifactId>json-simple</artifactId>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>com.twitter</groupId>
+            <artifactId>carbonite</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-server</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build> 
@@ -55,15 +70,41 @@
                     <sourceDirectories>
                         <sourceDirectory>src/clj</sourceDirectory>
                     </sourceDirectories>
+                    <testSourceDirectories>
+                        <testSourceDirectory>test/clj</testSourceDirectory>
+                    </testSourceDirectories>
+                    <warnOnReflection>false</warnOnReflection>
+                    <copyDeclaredNamespaceOnly>true</copyDeclaredNamespaceOnly>
+                    <copiedNamespaces>
+                        <copiedNamespace>none</copiedNamespace>
+                    </copiedNamespaces>
                 </configuration>
                 <executions>
                     <execution>
-                        <id>compile</id>
+                        <id>compile-clojure</id>
                         <phase>compile</phase>
                         <goals>
                             <goal>compile</goal>
                         </goals>
                     </execution>
+                    <execution>
+                        <id>test-clojure</id>
+                        <phase>test</phase>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                        <configuration>
+                            <junitOutput>true</junitOutput>
+                            <testScript>./test/resources/test_runner.clj</testScript>
+                            <!-- argLine is set by JaCoCo for code coverage -->
+                            <vmargs>-Xmx1536m ${argLine} ${test.extra.args}</vmargs>
+                            <!-- Run clojure unit tests or all tests (including integration tests) depending on the profile enabled -->
+                            <testNamespaces>
+                                <testNamespace>${clojure.test.set}</testNamespace>
+                            </testNamespaces>
+                            <testDeclaredNamespaceOnly>${clojure.test.declared.namespace.only}</testDeclaredNamespaceOnly>
+                        </configuration>
+                    </execution>
                 </executions>
             </plugin>
         </plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureBolt.java
----------------------------------------------------------------------
diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureBolt.java b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureBolt.java
new file mode 100644
index 0000000..1e9b8ba
--- /dev/null
+++ b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureBolt.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.clojure;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.coordination.CoordinatedBolt.FinishedCallback;
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+
+import clojure.lang.IFn;
+import clojure.lang.Keyword;
+import clojure.lang.PersistentArrayMap;
+import clojure.lang.RT;
+import clojure.lang.Symbol;
+
+public class ClojureBolt implements IRichBolt, FinishedCallback {
+    Map<String, StreamInfo> _fields;
+    List<String> _fnSpec;
+    List<String> _confSpec;
+    List<Object> _params;
+
+    IBolt _bolt;
+
+    public ClojureBolt(List fnSpec, List confSpec, List<Object> params, Map<String, StreamInfo> fields) {
+        _fnSpec = fnSpec;
+        _confSpec = confSpec;
+        _params = params;
+        _fields = fields;
+    }
+
+    @Override
+    public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) {
+        IFn hof = ClojureUtil.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
+        try {
+            IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
+            final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] {
+                    Keyword.intern(Symbol.create("output-collector")), collector,
+                    Keyword.intern(Symbol.create("context")), context});
+            List<Object> args = new ArrayList<Object>() {{
+                add(stormConf);
+                add(context);
+                add(collectorMap);
+            }};
+
+            _bolt = (IBolt) preparer.applyTo(RT.seq(args));
+            //this is kind of unnecessary for clojure
+            try {
+                _bolt.prepare(stormConf, context, collector);
+            } catch(AbstractMethodError ame) {
+
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        _bolt.execute(new ClojureTuple(input));
+    }
+
+    @Override
+    public void cleanup() {
+        try {
+            _bolt.cleanup();
+        } catch(AbstractMethodError ame) {
+
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        for(String stream: _fields.keySet()) {
+            StreamInfo info = _fields.get(stream);
+            declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields()));
+        }
+    }
+
+    @Override
+    public void finishedId(Object id) {
+        if(_bolt instanceof FinishedCallback) {
+            ((FinishedCallback) _bolt).finishedId(id);
+        }
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        IFn hof = ClojureUtil.loadClojureFn(_confSpec.get(0), _confSpec.get(1));
+        try {
+            return (Map) hof.applyTo(RT.seq(_params));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSerializationRegister.java
----------------------------------------------------------------------
diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSerializationRegister.java b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSerializationRegister.java
new file mode 100644
index 0000000..df0840b
--- /dev/null
+++ b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSerializationRegister.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.clojure;
+
+import org.apache.storm.serialization.SerializationRegister;
+
+import com.esotericsoftware.kryo.Kryo;
+
+import carbonite.JavaBridge;
+
+public class ClojureSerializationRegister implements SerializationRegister {
+
+    @Override
+    public void register(Kryo kryo) throws Exception {
+        JavaBridge.registerPrimitives(kryo);
+        JavaBridge.registerCollections(kryo);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSpout.java
----------------------------------------------------------------------
diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSpout.java b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSpout.java
new file mode 100644
index 0000000..efa3dd6
--- /dev/null
+++ b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSpout.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.clojure;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+
+import clojure.lang.IFn;
+import clojure.lang.Keyword;
+import clojure.lang.PersistentArrayMap;
+import clojure.lang.RT;
+import clojure.lang.Symbol;
+
+public class ClojureSpout implements IRichSpout {
+    Map<String, StreamInfo> _fields;
+    List<String> _fnSpec;
+    List<String> _confSpec;
+    List<Object> _params;
+    
+    ISpout _spout;
+    
+    public ClojureSpout(List fnSpec, List confSpec, List<Object> params, Map<String, StreamInfo> fields) {
+        _fnSpec = fnSpec;
+        _confSpec = confSpec;
+        _params = params;
+        _fields = fields;
+    }
+    
+
+    @Override
+    public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+        IFn hof = ClojureUtil.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
+        try {
+            IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
+            final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] {
+                Keyword.intern(Symbol.create("output-collector")), collector,
+                Keyword.intern(Symbol.create("context")), context});
+            List<Object> args = new ArrayList<Object>() {{
+                add(conf);
+                add(context);
+                add(collectorMap);
+            }};
+            
+            _spout = (ISpout) preparer.applyTo(RT.seq(args));
+            //this is kind of unnecessary for clojure
+            try {
+                _spout.open(conf, context, collector);
+            } catch(AbstractMethodError ame) {
+                
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            _spout.close();
+        } catch(AbstractMethodError ame) {
+                
+        }
+    }
+
+    @Override
+    public void nextTuple() {
+        try {
+            _spout.nextTuple();
+        } catch(AbstractMethodError ame) {
+                
+        }
+
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        try {
+            _spout.ack(msgId);
+        } catch(AbstractMethodError ame) {
+                
+        }
+
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        try {
+            _spout.fail(msgId);
+        } catch(AbstractMethodError ame) {
+                
+        }
+
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        for(String stream: _fields.keySet()) {
+            StreamInfo info = _fields.get(stream);
+            declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields()));
+        }
+    }
+    
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        IFn hof = ClojureUtil.loadClojureFn(_confSpec.get(0), _confSpec.get(1));
+        try {
+            return (Map) hof.applyTo(RT.seq(_params));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void activate() {
+        try {
+            _spout.activate();
+        } catch(AbstractMethodError ame) {
+                
+        }
+    }
+
+    @Override
+    public void deactivate() {
+        try {
+            _spout.deactivate();
+        } catch(AbstractMethodError ame) {
+                
+        }
+    }
+}