You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/28 01:44:27 UTC

[1/6] storm git commit: STORM-2468: Remove clojure from storm-client

Repository: storm
Updated Branches:
  refs/heads/master de2907f85 -> 2ec5581bb


http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/resources/test_runner.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/resources/test_runner.clj b/storm-core/test/resources/test_runner.clj
deleted file mode 100644
index c10fec3..0000000
--- a/storm-core/test/resources/test_runner.clj
+++ /dev/null
@@ -1,114 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.testrunner)
-
-(import `java.util.Properties)
-(import `java.io.ByteArrayOutputStream)
-(import `java.io.FileInputStream)
-(import `java.io.FileOutputStream)
-(import `java.io.FileWriter)
-(import `java.io.File)
-(import `java.io.OutputStream)
-(import `java.io.OutputStreamWriter)
-(import `java.io.PrintStream)
-(import `java.io.PrintWriter)
-(use 'clojure.test)
-(use 'clojure.test.junit)
-
-(def props (Properties.))
-(.load props (FileInputStream. (first *command-line-args*)))
-
-(def namespaces  (into [] 
-                       (for [[key val] props
-                             :when (.startsWith key "ns.")]
-                               (symbol val))))
-
-(def output-dir (.get props "outputDir"))
-
-(dorun (for [ns namespaces]
-  (require ns)))
-
-(.mkdirs (File. output-dir))
-
-(let [sys-out System/out
-      sys-err System/err
-      num-bad (atom 0)
-      original-junit-report junit-report
-      orig-out *out*]
-  (dorun (for [ns namespaces]
-    (with-open [out-stream (FileOutputStream. (str output-dir "/" ns ".xml"))
-                print-writer (PrintWriter. out-stream true)
-                print-stream (PrintStream. out-stream true)]
-      (.println sys-out (str "Running " ns))
-      (try
-        (let [in-sys-out (atom false)]
-        (binding [*test-out* print-writer
-                  *out* orig-out
-                  junit-report (fn [data]
-                                   (let [type (data :type)]
-                                     (cond
-                                       (= type :begin-test-var) (do
-                                                                  (when @in-sys-out
-                                                                    (reset! in-sys-out false)
-                                                                    (System/setOut sys-out)
-                                                                    (System/setErr sys-err)
-                                                                    (set! *out* orig-out)
-                                                                    (with-test-out
-                                                                      (print "]]>")
-                                                                      (finish-element 'system-out true)))
-                                                                   (original-junit-report data)
-                                                                   (reset! in-sys-out true)
-                                                                   (with-test-out
-                                                                     (start-element 'system-out true)
-                                                                     (print "<![CDATA[") (flush))
-                                                                   (System/setOut print-stream)
-                                                                   (System/setErr print-stream)
-                                                                   (set! *out* print-writer))
-                                       (= type :end-test-var) (when @in-sys-out
-                                                                (reset! in-sys-out false)
-                                                                (System/setOut sys-out)
-                                                                (System/setErr sys-err)
-                                                                (set! *out* orig-out)
-                                                                (with-test-out
-                                                                  (print "]]>")
-                                                                  (finish-element 'system-out true)))
-                                       (= type :fail) (when @in-sys-out
-                                                                (reset! in-sys-out false)
-                                                                (System/setOut sys-out)
-                                                                (System/setErr sys-err)
-                                                                (set! *out* orig-out)
-                                                                (with-test-out
-                                                                  (print "]]>")
-                                                                  (finish-element 'system-out true)))
-                                       (= type :error) (when @in-sys-out
-                                                                (reset! in-sys-out false)
-                                                                (System/setOut sys-out)
-                                                                (System/setErr sys-err)
-                                                                (set! *out* orig-out)
-                                                                (with-test-out
-                                                                  (print "]]>")
-                                                                  (finish-element 'system-out true))))
-                                     (if (not (= type :begin-test-var)) (original-junit-report data))))]
-          (with-junit-output
-            (let [result (run-tests ns)]
-               (.println sys-out (str "Tests run: " (result :test) ", Passed: " (result :pass) ", Failures: " (result :fail) ", Errors: " (result :error)))
-               (reset! num-bad (+ @num-bad (result :error) (result :fail)))))))
-        (finally 
-          (System/setOut sys-out)
-          (System/setErr sys-err))))))
-  (shutdown-agents)
-  (System/exit (if (> @num-bad 0) 1 0)))


[4/6] storm git commit: STORM-2468: Added in missing license header

Posted by bo...@apache.org.
STORM-2468: Added in missing license header


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8e1128ac
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8e1128ac
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8e1128ac

Branch: refs/heads/master
Commit: 8e1128ac9fc7cc7b9df20096e2b639615e29650d
Parents: a4cf917
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Apr 14 20:53:08 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Apr 14 20:53:08 2017 -0500

----------------------------------------------------------------------
 ...apache.storm.serialization.SerializationRegister | 16 ++++++++++++++++
 1 file changed, 16 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8e1128ac/storm-clojure/src/main/resources/META-INF/services/org.apache.storm.serialization.SerializationRegister
----------------------------------------------------------------------
diff --git a/storm-clojure/src/main/resources/META-INF/services/org.apache.storm.serialization.SerializationRegister b/storm-clojure/src/main/resources/META-INF/services/org.apache.storm.serialization.SerializationRegister
index cb66fd0..a90e755 100644
--- a/storm-clojure/src/main/resources/META-INF/services/org.apache.storm.serialization.SerializationRegister
+++ b/storm-clojure/src/main/resources/META-INF/services/org.apache.storm.serialization.SerializationRegister
@@ -1 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 org.apache.storm.clojure.ClojureSerializationRegister


[3/6] storm git commit: STORM-2468: Remove clojure from storm-client

Posted by bo...@apache.org.
STORM-2468: Remove clojure from storm-client


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a4cf917d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a4cf917d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a4cf917d

Branch: refs/heads/master
Commit: a4cf917d35352211a80502e04b08d3318cfeb6d1
Parents: 487a246
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Apr 10 07:51:02 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Apr 14 20:51:43 2017 -0500

----------------------------------------------------------------------
 docs/Serialization.md                           |  10 +-
 .../org/apache/storm/starter/clj/bolts_test.clj |  26 +-
 storm-client/pom.xml                            |  13 +-
 .../org/apache/storm/clojure/ClojureBolt.java   | 120 -----
 .../org/apache/storm/clojure/ClojureSpout.java  | 153 -------
 .../org/apache/storm/clojure/RichShellBolt.java |  51 ---
 .../apache/storm/clojure/RichShellSpout.java    |  51 ---
 .../org/apache/storm/cluster/IStateStorage.java |   1 -
 .../storm/cluster/StormClusterStateImpl.java    |   6 -
 .../storm/executor/bolt/BoltExecutor.java       |   4 +-
 .../executor/bolt/BoltOutputCollectorImpl.java  |   9 +-
 .../storm/metric/api/IMetricsConsumer.java      |   8 +-
 .../serialization/SerializationFactory.java     |  17 +-
 .../serialization/SerializationRegister.java    |  35 ++
 .../jvm/org/apache/storm/spout/ShellSpout.java  |   6 +-
 .../jvm/org/apache/storm/stats/StatsUtil.java   |  52 +--
 .../jvm/org/apache/storm/task/ShellBolt.java    |   6 +-
 .../storm/testing/KeyedSummingBatchBolt.java    |   3 +-
 .../storm/trident/operation/builtin/Sum.java    |  24 +-
 .../storm/trident/tuple/TridentTupleView.java   |  21 +-
 .../src/jvm/org/apache/storm/tuple/Tuple.java   |   6 +
 .../jvm/org/apache/storm/tuple/TupleImpl.java   | 235 ++++------
 .../storm/utils/IndifferentAccessMap.java       | 177 --------
 .../src/jvm/org/apache/storm/utils/Utils.java   |  10 -
 storm-clojure/pom.xml                           |  43 +-
 .../org/apache/storm/clojure/ClojureBolt.java   | 119 +++++
 .../clojure/ClojureSerializationRegister.java   |  33 ++
 .../org/apache/storm/clojure/ClojureSpout.java  | 154 +++++++
 .../org/apache/storm/clojure/ClojureTuple.java  | 439 +++++++++++++++++++
 .../org/apache/storm/clojure/ClojureUtil.java   |  31 ++
 .../storm/clojure/IndifferentAccessMap.java     | 176 ++++++++
 .../org/apache/storm/clojure/RichShellBolt.java |  51 +++
 .../apache/storm/clojure/RichShellSpout.java    |  51 +++
 ...he.storm.serialization.SerializationRegister |   1 +
 .../test/clj/org/apache/storm/tuple_test.clj    |  53 +++
 storm-clojure/test/resources/log4j2-test.xml    |  32 ++
 storm-clojure/test/resources/test_runner.clj    | 114 +++++
 storm-core/pom.xml                              | 193 +-------
 storm-core/src/clj/org/apache/storm/config.clj  |  37 --
 .../clj/org/apache/storm/daemon/logviewer.clj   |   2 +-
 .../src/clj/org/apache/storm/daemon_config.clj  |  30 ++
 .../clj/org/apache/storm/internal/clojure.clj   | 204 ---------
 storm-core/src/clj/org/apache/storm/log.clj     |  34 --
 storm-core/src/clj/org/apache/storm/ui/core.clj |   2 +-
 storm-core/src/clj/org/apache/storm/util.clj    | 134 ------
 .../org/apache/storm/integration_test.clj       |   2 +-
 .../org/apache/storm/testing4j_test.clj         |   4 +-
 .../test/clj/org/apache/storm/drpc_test.clj     |   2 +-
 .../test/clj/org/apache/storm/grouping_test.clj |   2 +-
 .../clj/org/apache/storm/logviewer_test.clj     |   2 +-
 .../test/clj/org/apache/storm/metrics_test.clj  |   2 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   |   2 +-
 .../scheduler/multitenant_scheduler_test.clj    |   2 +-
 .../apache/storm/security/auth/auth_test.clj    |   2 +-
 .../storm/security/auth/nimbus_auth_test.clj    |   2 +-
 .../clj/org/apache/storm/transactional_test.clj |   2 +-
 .../test/clj/org/apache/storm/tuple_test.clj    |  52 ---
 storm-core/test/resources/test_runner.clj       | 114 -----
 58 files changed, 1563 insertions(+), 1604 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/docs/Serialization.md
----------------------------------------------------------------------
diff --git a/docs/Serialization.md b/docs/Serialization.md
index c2e129b..e35a0f9 100644
--- a/docs/Serialization.md
+++ b/docs/Serialization.md
@@ -9,7 +9,7 @@ Tuples can be comprised of objects of any types. Since Storm is a distributed sy
 
 Storm uses [Kryo](https://github.com/EsotericSoftware/kryo) for serialization. Kryo is a flexible and fast serialization library that produces small serializations.
 
-By default, Storm can serialize primitive types, strings, byte arrays, ArrayList, HashMap, HashSet, and the Clojure collection types. If you want to use another type in your tuples, you'll need to register a custom serializer.
+By default, Storm can serialize primitive types, strings, byte arrays, ArrayList, HashMap, and HashSet. If you want to use another type in your tuples, you'll need to register a custom serializer.
 
 ### Dynamic typing
 
@@ -25,7 +25,7 @@ Finally, another reason for using dynamic typing is so Storm can be used in a st
 
 As mentioned, Storm uses Kryo for serialization. To implement custom serializers, you need to register new serializers with Kryo. It's highly recommended that you read over [Kryo's home page](https://github.com/EsotericSoftware/kryo) to understand how it handles custom serialization.
 
-Adding custom serializers is done through the "topology.kryo.register" property in your topology config. It takes a list of registrations, where each registration can take one of two forms:
+Adding custom serializers is done through the "topology.kryo.register" property in your topology config or through a ServiceLoader described later. The config takes a list of registrations, where each registration can take one of two forms:
 
 1. The name of a class to register. In this case, Storm will use Kryo's `FieldsSerializer` to serialize the class. This may or may not be optimal for the class -- see the Kryo docs for more details.
 2. A map from the name of a class to register to an implementation of [com.esotericsoftware.kryo.Serializer](https://github.com/EsotericSoftware/kryo/blob/master/src/com/esotericsoftware/kryo/Serializer.java).
@@ -45,6 +45,12 @@ Storm provides helpers for registering serializers in a topology config. The [Co
 
 There's an advanced config called `Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS`. If you set this to true, Storm will ignore any serializations that are registered but do not have their code available on the classpath. Otherwise, Storm will throw errors when it can't find a serialization. This is useful if you run many topologies on a cluster that each have different serializations, but you want to declare all the serializations across all topologies in the `storm.yaml` files.
 
+#### SerializationRegister Service Loader
+
+If you want to provide language bindings to storm, have a library that you want to interact cleanly with storm or have some other reason to provide serialization bindings and don't want to force the user to update their configs you can use the org.apache.storm.serialization.SerializationRegister service loader.
+
+You may use this like any other service loader and storm will register the bindings without forceing users to update their configs. The storm-clojure package uses this to provide transparent support for clojure types.
+
 ### Java serialization
 
 If Storm encounters a type for which it doesn't have a serialization registered, it will use Java serialization if possible. If the object can't be serialized with Java serialization, then Storm will throw an error.

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj b/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj
index 164e7ac..745678c 100644
--- a/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj
+++ b/examples/storm-starter/test/clj/org/apache/storm/starter/clj/bolts_test.clj
@@ -20,10 +20,12 @@
             [org.apache.storm.starter.clj.bolts :refer
              [rolling-count-bolt intermediate-rankings-bolt total-rankings-bolt]]
             [org.apache.storm [testing :refer :all]])
-  (:import [org.apache.storm Constants]
+  (:import [org.apache.storm Constants Testing]
+           [org.apache.storm.testing MkTupleParam]
            [org.apache.storm.task OutputCollector IOutputCollector]
            [org.apache.storm.starter.tools Rankable]
-           [org.apache.storm.tuple Tuple]))
+           [org.apache.storm.tuple Tuple]
+           [java.util ArrayList]))
 
 (defn execute-tuples [bolt tuples]
   (let [out (atom [])]
@@ -40,21 +42,11 @@
 
 (defn- mock-tuple [m & {component :component stream-id :stream-id
                         :or {component "1" stream-id "1"}}]
-  (reify
-    Tuple
-    (getSourceComponent [_]
-      component)
-    (getSourceStreamId [_]
-      stream-id)
-    (getString [this i]
-      (nth (vals m) 0))
-    (getValues [_]
-      (vals m))
-    clojure.lang.IPersistentMap
-    (valAt [_ key]
-      (get m key))
-    (seq [_]
-      (seq m))))
+  (let [param (MkTupleParam.)]
+    (.setStream param stream-id)
+    (.setComponent param component)
+    (.setFieldsList param (ArrayList. (.keySet m)))
+    (Testing/testTuple (ArrayList. (.values m)) param)))
 
 (def ^{:private true} tick-tuple
   (mock-tuple {}

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/pom.xml
----------------------------------------------------------------------
diff --git a/storm-client/pom.xml b/storm-client/pom.xml
index 9813593..ba27376 100644
--- a/storm-client/pom.xml
+++ b/storm-client/pom.xml
@@ -124,17 +124,6 @@
             <artifactId>snakeyaml</artifactId>
         </dependency>
 
-        <!-- clojure -->
-        <dependency>
-            <groupId>org.clojure</groupId>
-            <artifactId>clojure</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.twitter</groupId>
-            <artifactId>carbonite</artifactId>
-            <scope>compile</scope>
-        </dependency>
-
         <!-- netty -->
         <dependency>
             <groupId>io.netty</groupId>
@@ -333,4 +322,4 @@
         </plugins>
     </build>
 
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/clojure/ClojureBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/clojure/ClojureBolt.java b/storm-client/src/jvm/org/apache/storm/clojure/ClojureBolt.java
deleted file mode 100644
index 60300e2..0000000
--- a/storm-client/src/jvm/org/apache/storm/clojure/ClojureBolt.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.clojure;
-
-import org.apache.storm.coordination.CoordinatedBolt.FinishedCallback;
-import org.apache.storm.generated.StreamInfo;
-import org.apache.storm.task.IBolt;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichBolt;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import clojure.lang.IFn;
-import clojure.lang.PersistentArrayMap;
-import clojure.lang.Keyword;
-import clojure.lang.Symbol;
-import clojure.lang.RT;
-import org.apache.storm.utils.Utils;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-
-public class ClojureBolt implements IRichBolt, FinishedCallback {
-    Map<String, StreamInfo> _fields;
-    List<String> _fnSpec;
-    List<String> _confSpec;
-    List<Object> _params;
-    
-    IBolt _bolt;
-    
-    public ClojureBolt(List fnSpec, List confSpec, List<Object> params, Map<String, StreamInfo> fields) {
-        _fnSpec = fnSpec;
-        _confSpec = confSpec;
-        _params = params;
-        _fields = fields;
-    }
-
-    @Override
-    public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) {
-        IFn hof = Utils.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
-        try {
-            IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
-            final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] {
-                Keyword.intern(Symbol.create("output-collector")), collector,
-                Keyword.intern(Symbol.create("context")), context});
-            List<Object> args = new ArrayList<Object>() {{
-                add(stormConf);
-                add(context);
-                add(collectorMap);
-            }};
-            
-            _bolt = (IBolt) preparer.applyTo(RT.seq(args));
-            //this is kind of unnecessary for clojure
-            try {
-                _bolt.prepare(stormConf, context, collector);
-            } catch(AbstractMethodError ame) {
-                
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        _bolt.execute(input);
-    }
-
-    @Override
-    public void cleanup() {
-            try {
-                _bolt.cleanup();
-            } catch(AbstractMethodError ame) {
-                
-            }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for(String stream: _fields.keySet()) {
-            StreamInfo info = _fields.get(stream);
-            declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields()));
-        }
-    }
-
-    @Override
-    public void finishedId(Object id) {
-        if(_bolt instanceof FinishedCallback) {
-            ((FinishedCallback) _bolt).finishedId(id);
-        }
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        IFn hof = Utils.loadClojureFn(_confSpec.get(0), _confSpec.get(1));
-        try {
-            return (Map) hof.applyTo(RT.seq(_params));
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/clojure/ClojureSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/clojure/ClojureSpout.java b/storm-client/src/jvm/org/apache/storm/clojure/ClojureSpout.java
deleted file mode 100644
index 372b306..0000000
--- a/storm-client/src/jvm/org/apache/storm/clojure/ClojureSpout.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.clojure;
-
-import org.apache.storm.generated.StreamInfo;
-import org.apache.storm.spout.ISpout;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichSpout;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
-import clojure.lang.IFn;
-import clojure.lang.PersistentArrayMap;
-import clojure.lang.Keyword;
-import clojure.lang.Symbol;
-import clojure.lang.RT;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class ClojureSpout implements IRichSpout {
-    Map<String, StreamInfo> _fields;
-    List<String> _fnSpec;
-    List<String> _confSpec;
-    List<Object> _params;
-    
-    ISpout _spout;
-    
-    public ClojureSpout(List fnSpec, List confSpec, List<Object> params, Map<String, StreamInfo> fields) {
-        _fnSpec = fnSpec;
-        _confSpec = confSpec;
-        _params = params;
-        _fields = fields;
-    }
-    
-
-    @Override
-    public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
-        IFn hof = Utils.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
-        try {
-            IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
-            final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] {
-                Keyword.intern(Symbol.create("output-collector")), collector,
-                Keyword.intern(Symbol.create("context")), context});
-            List<Object> args = new ArrayList<Object>() {{
-                add(conf);
-                add(context);
-                add(collectorMap);
-            }};
-            
-            _spout = (ISpout) preparer.applyTo(RT.seq(args));
-            //this is kind of unnecessary for clojure
-            try {
-                _spout.open(conf, context, collector);
-            } catch(AbstractMethodError ame) {
-                
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void close() {
-        try {
-            _spout.close();
-        } catch(AbstractMethodError ame) {
-                
-        }
-    }
-
-    @Override
-    public void nextTuple() {
-        try {
-            _spout.nextTuple();
-        } catch(AbstractMethodError ame) {
-                
-        }
-
-    }
-
-    @Override
-    public void ack(Object msgId) {
-        try {
-            _spout.ack(msgId);
-        } catch(AbstractMethodError ame) {
-                
-        }
-
-    }
-
-    @Override
-    public void fail(Object msgId) {
-        try {
-            _spout.fail(msgId);
-        } catch(AbstractMethodError ame) {
-                
-        }
-
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for(String stream: _fields.keySet()) {
-            StreamInfo info = _fields.get(stream);
-            declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields()));
-        }
-    }
-    
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        IFn hof = Utils.loadClojureFn(_confSpec.get(0), _confSpec.get(1));
-        try {
-            return (Map) hof.applyTo(RT.seq(_params));
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void activate() {
-        try {
-            _spout.activate();
-        } catch(AbstractMethodError ame) {
-                
-        }
-    }
-
-    @Override
-    public void deactivate() {
-        try {
-            _spout.deactivate();
-        } catch(AbstractMethodError ame) {
-                
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/clojure/RichShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/clojure/RichShellBolt.java b/storm-client/src/jvm/org/apache/storm/clojure/RichShellBolt.java
deleted file mode 100644
index 6de5637..0000000
--- a/storm-client/src/jvm/org/apache/storm/clojure/RichShellBolt.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.clojure;
-
-import org.apache.storm.generated.StreamInfo;
-import org.apache.storm.task.ShellBolt;
-import org.apache.storm.topology.IRichBolt;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import java.util.Map;
-
-public class RichShellBolt extends ShellBolt implements IRichBolt {
-    private Map<String, StreamInfo> _outputs;
-    
-    public RichShellBolt(String[] command, Map<String, StreamInfo> outputs) {
-        super(command);
-        _outputs = outputs;
-    }
-    
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for(String stream: _outputs.keySet()) {
-            StreamInfo def = _outputs.get(stream);
-            if(def.is_direct()) {
-                declarer.declareStream(stream, true, new Fields(def.get_output_fields()));
-            } else {
-                declarer.declareStream(stream, new Fields(def.get_output_fields()));                
-            }
-        }
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
-    }    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/clojure/RichShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/clojure/RichShellSpout.java b/storm-client/src/jvm/org/apache/storm/clojure/RichShellSpout.java
deleted file mode 100644
index 9fb7e73..0000000
--- a/storm-client/src/jvm/org/apache/storm/clojure/RichShellSpout.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.clojure;
-
-import org.apache.storm.generated.StreamInfo;
-import org.apache.storm.spout.ShellSpout;
-import org.apache.storm.topology.IRichSpout;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import java.util.Map;
-
-public class RichShellSpout extends ShellSpout implements IRichSpout {
-    private Map<String, StreamInfo> _outputs;
-
-    public RichShellSpout(String[] command, Map<String, StreamInfo> outputs) {
-        super(command);
-        _outputs = outputs;
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for(String stream: _outputs.keySet()) {
-            StreamInfo def = _outputs.get(stream);
-            if(def.is_direct()) {
-                declarer.declareStream(stream, true, new Fields(def.get_output_fields()));
-            } else {
-                declarer.declareStream(stream, new Fields(def.get_output_fields()));
-            }
-        }
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java b/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java
index aa731ff..a004d25 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java
@@ -18,7 +18,6 @@
 package org.apache.storm.cluster;
 
 import java.util.List;
-import java.util.Map;
 
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.storm.callback.ZKStateChangedCallback;

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 51caad9..1655323 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -503,12 +503,6 @@ public class StormClusterStateImpl implements IStormClusterState {
         stateStorage.set_data(path, Utils.serialize(stormBase), acls);
     }
 
-    /**
-     * To update this function due to APersistentMap/APersistentSet is clojure's structure
-     * 
-     * @param stormId
-     * @param newElems
-     */
     @Override
     public void updateStorm(String stormId, StormBase newElems) {
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
index 97b16d8..c3a18ad 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -115,7 +115,7 @@ public class BoltExecutor extends Executor {
             IBolt boltObject = (IBolt) idToTask.get(taskId).getTaskObject();
             boolean isSampled = sampler.call();
             boolean isExecuteSampler = executeSampler.call();
-            Long now = (isSampled || isExecuteSampler) ? System.currentTimeMillis() : null;
+            Long now = (isSampled || isExecuteSampler) ? Time.currentTimeMillis() : null;
             if (isSampled) {
                 tuple.setProcessSampleStartTime(now);
             }
@@ -130,7 +130,7 @@ public class BoltExecutor extends Executor {
                 LOG.info("Execute done TUPLE {} TASK: {} DELTA: {}", tuple, taskId, delta);
             }
             new BoltExecuteInfo(tuple, taskId, delta).applyOn(idToTask.get(taskId).getUserContext());
-            if (delta != 0) {
+            if (delta >= 0) {
                 ((BoltExecutorStats) stats).boltExecuteTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), delta);
             }
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
index c490c3d..46b945b 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
@@ -115,7 +115,7 @@ public class BoltOutputCollectorImpl implements IOutputCollector {
         }
         BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta);
         boltAckInfo.applyOn(taskData.getUserContext());
-        if (delta != 0) {
+        if (delta >= 0) {
             ((BoltExecutorStats) executor.getStats()).boltAckedTuple(
                     input.getSourceComponent(), input.getSourceStreamId(), delta);
         }
@@ -134,7 +134,7 @@ public class BoltOutputCollectorImpl implements IOutputCollector {
         }
         BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta);
         boltFailInfo.applyOn(taskData.getUserContext());
-        if (delta != 0) {
+        if (delta >= 0) {
             ((BoltExecutorStats) executor.getStats()).boltFailedTuple(
                     input.getSourceComponent(), input.getSourceStreamId(), delta);
         }
@@ -156,9 +156,10 @@ public class BoltOutputCollectorImpl implements IOutputCollector {
 
     private long tupleTimeDelta(TupleImpl tuple) {
         Long ms = tuple.getProcessSampleStartTime();
-        if (ms != null)
+        if (ms != null) {
             return Time.deltaMs(ms);
-        return 0;
+        }
+        return -1;
     }
 
     private void putXor(Map<Long, Long> pending, Long key, Long id) {

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java b/storm-client/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java
index e7ca10d..f2623be 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java
@@ -39,7 +39,13 @@ public interface IMetricsConsumer {
         public String srcComponentId; 
         public int srcTaskId; 
         public long timestamp;
-        public int updateIntervalSecs; 
+        public int updateIntervalSecs;
+
+        @Override
+        public String toString() {
+            return "TASK_INFO: { host: " + srcWorkerHost + ":" + srcWorkerPort +
+                    " comp: " + srcComponentId + "["+ srcTaskId + "]}";
+        }
     }
 
     // We can't move this to outside without breaking backward compatibility.

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java b/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java
index e3c15ac..fbe4cfa 100644
--- a/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java
@@ -29,7 +29,6 @@ import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.ListDelegate;
 import org.apache.storm.utils.ReflectionUtils;
-import carbonite.JavaBridge;
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.serializers.DefaultSerializers.BigIntegerSerializer;
@@ -40,12 +39,14 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.ServiceLoader;
 import java.util.TreeMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SerializationFactory {
     public static final Logger LOG = LoggerFactory.getLogger(SerializationFactory.class);
+    public static final ServiceLoader<SerializationRegister> loader = ServiceLoader.load(SerializationRegister.class);
 
     public static Kryo getKryo(Map conf) {
         IKryoFactory kryoFactory = (IKryoFactory) ReflectionUtils.newInstance((String) conf.get(Config.TOPOLOGY_KRYO_FACTORY));
@@ -71,11 +72,15 @@ public class SerializationFactory {
         k.register(org.apache.storm.metric.api.IMetricsConsumer.DataPoint.class);
         k.register(org.apache.storm.metric.api.IMetricsConsumer.TaskInfo.class);
         k.register(ConsList.class);
-        try {
-            JavaBridge.registerPrimitives(k);
-            JavaBridge.registerCollections(k);
-        } catch(Exception e) {
-            throw new RuntimeException(e);
+        
+        synchronized (loader) {
+            for (SerializationRegister sr: loader) {
+                try {
+                    sr.register(k);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
         }
 
         Map<String, String> registrations = normalizeKryoRegister(conf);

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/serialization/SerializationRegister.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/serialization/SerializationRegister.java b/storm-client/src/jvm/org/apache/storm/serialization/SerializationRegister.java
new file mode 100644
index 0000000..3834c33
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/serialization/SerializationRegister.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.serialization;
+
+import com.esotericsoftware.kryo.Kryo;
+
+/**
+ * Provides a way using a service loader to register Kryo
+ * serializers with the SerializationFactory without needing
+ * to modify the config.  This allows for language bindings
+ * libraries or platforms to include their own registration
+ * without impacting a clients config.
+ */
+public interface SerializationRegister {
+    /**
+     * Register any serializers needed with the kryo instance
+     * @param kryo what to register the serializers with.
+     */
+    public void register(Kryo kryo) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/spout/ShellSpout.java b/storm-client/src/jvm/org/apache/storm/spout/ShellSpout.java
index a5ec72b..72b7f17 100644
--- a/storm-client/src/jvm/org/apache/storm/spout/ShellSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/spout/ShellSpout.java
@@ -24,6 +24,7 @@ import org.apache.storm.metric.api.rpc.IShellMetric;
 import org.apache.storm.multilang.ShellMsg;
 import org.apache.storm.multilang.SpoutMsg;
 import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ShellProcess;
 
 import java.util.Arrays;
@@ -37,7 +38,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-import clojure.lang.RT;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -98,9 +98,9 @@ public class ShellSpout implements ISpout {
         _context = context;
 
         if (stormConf.containsKey(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS)) {
-            workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS));
+            workerTimeoutMills = 1000 * ObjectReader.getInt(stormConf.get(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS));
         } else {
-            workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
+            workerTimeoutMills = 1000 * ObjectReader.getInt(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
         }
 
         _process = new ShellProcess(_command);

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java b/storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java
index 0ab48f0..6420546 100644
--- a/storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java
+++ b/storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java
@@ -641,12 +641,11 @@ public class StatsUtil {
 
     /**
      * aggregate topo executors stats
-     * TODO: change clojure maps to java HashMap's when nimbus.clj is translated to java
      *
      * @param topologyId     topology id
-     * @param exec2nodePort  executor -> host+port, note it's a clojure map
-     * @param task2component task -> component, note it's a clojure map
-     * @param beats          executor[start, end] -> executor heartbeat, note it's a java HashMap
+     * @param exec2nodePort  executor -> host+port
+     * @param task2component task -> component
+     * @param beats          executor[start, end] -> executor heartbeat
      * @param topology       storm topology
      * @param window         the window to be aggregated
      * @param includeSys     whether to include system streams
@@ -1232,8 +1231,8 @@ public class StatsUtil {
     /**
      * aggregate component executor stats
      *
-     * @param exec2hostPort  a Map of {executor -> host+port}, note it's a clojure map
-     * @param task2component a Map of {task id -> component}, note it's a clojure map
+     * @param exec2hostPort  a Map of {executor -> host+port}
+     * @param task2component a Map of {task id -> component}
      * @param beats          a converted HashMap of executor heartbeats, {executor -> heartbeat}
      * @param window         specified window
      * @param includeSys     whether to include system streams
@@ -1258,9 +1257,9 @@ public class StatsUtil {
      *
      * @param topologyId       topology id
      * @param topology         storm topology
-     * @param task2component   a Map of {task id -> component}, note it's a clojure map
+     * @param task2component   a Map of {task id -> component}
      * @param beats            a converted HashMap of executor heartbeats, {executor -> heartbeat}
-     * @param exec2hostPort    a Map of {executor -> host+port}, note it's a clojure map
+     * @param exec2hostPort    a Map of {executor -> host+port}
      * @param includeSys       whether to include system streams
      * @param userAuthorized   whether the user is authorized to view topology info
      * @param filterSupervisor if not null, only return WorkerSummaries for that supervisor
@@ -1363,9 +1362,9 @@ public class StatsUtil {
      *
      * @param topologyId       topology id
      * @param topology         storm topology
-     * @param task2component   a Map of {task id -> component}, note it's a clojure map
+     * @param task2component   a Map of {task id -> component}
      * @param beats            a converted HashMap of executor heartbeats, {executor -> heartbeat}
-     * @param exec2hostPort    a Map of {executor -> host+port}, note it's a clojure map
+     * @param exec2hostPort    a Map of {executor -> host+port}
      * @param includeSys       whether to include system streams
      * @param userAuthorized   whether the user is authorized to view topology info
      *
@@ -1499,8 +1498,8 @@ public class StatsUtil {
     /**
      * extract a list of host port info for specified component
      *
-     * @param exec2hostPort  {executor -> host+port}, note it's a clojure map
-     * @param task2component {task id -> component}, note it's a clojure map
+     * @param exec2hostPort  {executor -> host+port}
+     * @param task2component {task id -> component}
      * @param includeSys     whether to include system streams
      * @param compId         component id
      * @return a list of host+port
@@ -1690,24 +1689,11 @@ public class StatsUtil {
         return ret;
     }
 
-
-    /**
-     * convert a list of clojure executors to a java Set of List<Integer>
-     */
-    public static Set<List<Integer>> convertExecutors(Set executors) {
-        Set<List<Integer>> convertedExecutors = new HashSet<>();
-        for (Object executor : executors) {
-            List l = (List) executor;
-            convertedExecutors.add(convertExecutor(l));
-        }
-        return convertedExecutors;
-    }
-
     /**
-     * convert a clojure executor to java List<Integer>
+     * convert a List<Long> executor to java List<Integer>
      */
-    public static List<Integer> convertExecutor(List executor) {
-        return Lists.newArrayList(((Number) executor.get(0)).intValue(), ((Number) executor.get(1)).intValue());
+    public static List<Integer> convertExecutor(List<Long> executor) {
+        return Lists.newArrayList(executor.get(0).intValue(), executor.get(1).intValue());
     }
 
     /**
@@ -2249,14 +2235,12 @@ public class StatsUtil {
     }
 
     /**
-     * convert clojure structure to java maps
+     * convert Long Executor Ids in ZkHbs to Integer ones structure to java maps
      */
-    public static Map<List<Integer>, ExecutorStats> convertExecutorZkHbs(Map executorBeats) {
+    public static Map<List<Integer>, ExecutorStats> convertExecutorZkHbs(Map<List<Long>, ExecutorStats> executorBeats) {
         Map<List<Integer>, ExecutorStats> ret = new HashMap<>();
-        for (Object executorBeat : executorBeats.entrySet()) {
-            Map.Entry entry = (Map.Entry) executorBeat;
-            List startEnd = (List) entry.getKey();
-            ret.put(convertExecutor(startEnd), (ExecutorStats) entry.getValue());
+        for (Map.Entry<List<Long>, ExecutorStats> entry : executorBeats.entrySet()) {
+            ret.put(convertExecutor(entry.getKey()), entry.getValue());
         }
         return ret;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/task/ShellBolt.java b/storm-client/src/jvm/org/apache/storm/task/ShellBolt.java
index 6acb19b..0acb409 100644
--- a/storm-client/src/jvm/org/apache/storm/task/ShellBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/task/ShellBolt.java
@@ -27,9 +27,9 @@ import org.apache.storm.multilang.ShellMsg;
 import org.apache.storm.topology.ReportedFailedException;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ShellBoltMessageQueue;
 import org.apache.storm.utils.ShellProcess;
-import clojure.lang.RT;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -140,9 +140,9 @@ public class ShellBolt implements IBolt {
         _context = context;
 
         if (stormConf.containsKey(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS)) {
-            workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS));
+            workerTimeoutMills = 1000 * ObjectReader.getInt(stormConf.get(Config.TOPOLOGY_SUBPROCESS_TIMEOUT_SECS));
         } else {
-            workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
+            workerTimeoutMills = 1000 * ObjectReader.getInt(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
         }
 
         _process = new ShellProcess(_command);

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/testing/KeyedSummingBatchBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/KeyedSummingBatchBolt.java b/storm-client/src/jvm/org/apache/storm/testing/KeyedSummingBatchBolt.java
index 598a54c..b508bf1 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/KeyedSummingBatchBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/KeyedSummingBatchBolt.java
@@ -25,7 +25,6 @@ import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.Utils;
-import clojure.lang.Numbers;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -44,7 +43,7 @@ public class KeyedSummingBatchBolt extends BaseBatchBolt {
     public void execute(Tuple tuple) {
         Object key = tuple.getValue(1);
         Number curr = Utils.get(_sums, key, 0);
-        _sums.put(key, Numbers.add(curr, tuple.getValue(2)));
+        _sums.put(key, curr.longValue()  + ((Number)tuple.getValue(2)).longValue());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/trident/operation/builtin/Sum.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/trident/operation/builtin/Sum.java b/storm-client/src/jvm/org/apache/storm/trident/operation/builtin/Sum.java
index 003d780..27f41bf 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/operation/builtin/Sum.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/operation/builtin/Sum.java
@@ -17,7 +17,8 @@
  */
 package org.apache.storm.trident.operation.builtin;
 
-import clojure.lang.Numbers;
+import java.math.BigDecimal;
+
 import org.apache.storm.trident.operation.CombinerAggregator;
 import org.apache.storm.trident.tuple.TridentTuple;
 
@@ -31,12 +32,29 @@ public class Sum implements CombinerAggregator<Number> {
 
     @Override
     public Number combine(Number val1, Number val2) {
-        return Numbers.add(val1, val2);
+        if (val1 instanceof BigDecimal || val2 instanceof BigDecimal) {
+            BigDecimal v1 = asBigDecimal(val1);
+            BigDecimal v2 = asBigDecimal(val2);
+            return (v1).add(v2);
+        }
+        if (val1 instanceof Double || val2 instanceof Double) {
+            return val1.doubleValue() + val2.doubleValue();
+        }
+        return val1.longValue() + val2.longValue();
+    }
+
+    private static BigDecimal asBigDecimal(Number val) {
+        BigDecimal ret;
+        if (val instanceof BigDecimal) {
+            ret = (BigDecimal) val;
+        } else {
+            ret = new BigDecimal(val.doubleValue());
+        }
+        return ret;
     }
 
     @Override
     public Number zero() {
         return 0;
     }
-    
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/trident/tuple/TridentTupleView.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/trident/tuple/TridentTupleView.java b/storm-client/src/jvm/org/apache/storm/trident/tuple/TridentTupleView.java
index 02f28c6..df85d80 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/tuple/TridentTupleView.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/tuple/TridentTupleView.java
@@ -19,9 +19,6 @@ package org.apache.storm.trident.tuple;
 
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
-import clojure.lang.IPersistentVector;
-import clojure.lang.PersistentVector;
-import clojure.lang.RT;
 import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -35,9 +32,9 @@ import java.util.Arrays;
  * Extends AbstractList so that it can be emitted directly as Storm tuples
  */
 public class TridentTupleView extends AbstractList<Object> implements TridentTuple {
-    ValuePointer[] _index;
-    Map<String, ValuePointer> _fieldIndex;
-    IPersistentVector _delegates;
+    private final ValuePointer[] _index;
+    private final Map<String, ValuePointer> _fieldIndex;
+    private final List<List<Object>> _delegates;
 
     public static class ProjectionFactory implements Factory {
         Map<String, ValuePointer> _fieldIndex;
@@ -90,7 +87,7 @@ public class TridentTupleView extends AbstractList<Object> implements TridentTup
         }
         
         public TridentTuple create(List<Object> selfVals) {
-            return new TridentTupleView(PersistentVector.EMPTY.cons(selfVals), _index, _fieldIndex);
+            return new TridentTupleView(Arrays.asList(selfVals), _index, _fieldIndex);
         }
 
         @Override
@@ -138,8 +135,8 @@ public class TridentTupleView extends AbstractList<Object> implements TridentTup
         }
         
         public TridentTuple create(TridentTupleView parent, List<Object> selfVals) {
-            IPersistentVector curr = parent._delegates;
-            curr = (IPersistentVector) RT.conj(curr, selfVals);
+            List<List<Object>> curr = new ArrayList<>(parent._delegates);
+            curr.add(selfVals);
             return new TridentTupleView(curr, _index, _fieldIndex);
         }
 
@@ -174,7 +171,7 @@ public class TridentTupleView extends AbstractList<Object> implements TridentTup
         }
         
         public TridentTuple create(Tuple parent) {            
-            return new TridentTupleView(PersistentVector.EMPTY.cons(parent.getValues()), index, fieldIndex);
+            return new TridentTupleView(Arrays.asList(parent.getValues()), index, fieldIndex);
         }
 
         @Override
@@ -204,7 +201,7 @@ public class TridentTupleView extends AbstractList<Object> implements TridentTup
     public static final TridentTupleView EMPTY_TUPLE = new TridentTupleView(null, new ValuePointer[0], new HashMap());
 
     // index and fieldIndex are precomputed, delegates built up over many operations using persistent data structures
-    public TridentTupleView(IPersistentVector delegates, ValuePointer[] index, Map<String, ValuePointer> fieldIndex) {
+    public TridentTupleView(List delegates, ValuePointer[] index, Map<String, ValuePointer> fieldIndex) {
         _delegates = delegates;
         _index = index;
         _fieldIndex = fieldIndex;
@@ -356,6 +353,6 @@ public class TridentTupleView extends AbstractList<Object> implements TridentTup
     }
 
     private Object getValueByPointer(ValuePointer ptr) {
-        return ((List<Object>)_delegates.nth(ptr.delegateIndex)).get(ptr.index);     
+        return _delegates.get(ptr.delegateIndex).get(ptr.index);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/tuple/Tuple.java b/storm-client/src/jvm/org/apache/storm/tuple/Tuple.java
index 88697ef..bf727e0 100644
--- a/storm-client/src/jvm/org/apache/storm/tuple/Tuple.java
+++ b/storm-client/src/jvm/org/apache/storm/tuple/Tuple.java
@@ -18,6 +18,7 @@
 package org.apache.storm.tuple;
 
 import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.task.GeneralTopologyContext;
 
 /**
  * The tuple is the main data structure in Storm. A tuple is a named list of values, 
@@ -65,4 +66,9 @@ public interface Tuple extends ITuple{
      * Gets the message id that associated with this tuple.
      */
     public MessageId getMessageId();
+
+    /**
+     * Gets the topology context associated with the tuple
+     */
+    public GeneralTopologyContext getContext();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java b/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java
index c793dfd..9356c4b 100644
--- a/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java
@@ -17,33 +17,38 @@
  */
 package org.apache.storm.tuple;
 
+import java.util.Collections;
+import java.util.List;
+
 import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.task.GeneralTopologyContext;
-import org.apache.storm.utils.IndifferentAccessMap;
-import clojure.lang.ASeq;
-import clojure.lang.Counted;
-import clojure.lang.IMeta;
-import clojure.lang.IPersistentMap;
-import clojure.lang.ISeq;
-import clojure.lang.Indexed;
-import clojure.lang.Keyword;
-import clojure.lang.MapEntry;
-import clojure.lang.Obj;
-import clojure.lang.PersistentArrayMap;
-import clojure.lang.Seqable;
-import clojure.lang.Symbol;
-import java.util.List;
 
-public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed, IMeta, Tuple {
-    private List<Object> values;
-    private int taskId;
-    private String streamId;
-    private GeneralTopologyContext context;
-    private MessageId id;
-    private IPersistentMap _meta;
+public class TupleImpl implements Tuple {
+    private final List<Object> values;
+    private final int taskId;
+    private final String streamId;
+    private final GeneralTopologyContext context;
+    private final MessageId id;
+    private Long _processSampleStartTime;
+    private Long _executeSampleStartTime;
+    private long _outAckVal = 0;
     
+    public TupleImpl(Tuple t) {
+        this.values = t.getValues();
+        this.taskId = t.getSourceTask();
+        this.streamId = t.getSourceStreamId();
+        this.id = t.getMessageId();
+        this.context = t.getContext();
+        if (t instanceof TupleImpl) {
+            TupleImpl ti = (TupleImpl) t;
+            this._processSampleStartTime = ti._processSampleStartTime;
+            this._executeSampleStartTime = ti._executeSampleStartTime;
+            this._outAckVal = ti._outAckVal;
+        }
+    }
+
     public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) {
-        this.values = values;
+        this.values = Collections.unmodifiableList(values);
         this.taskId = taskId;
         this.streamId = streamId;
         this.id = id;
@@ -61,10 +66,7 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
 
     public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId) {
         this(context, values, taskId, streamId, MessageId.makeUnanchored());
-    }    
-    
-    Long _processSampleStartTime;
-    Long _executeSampleStartTime;
+    }
     
     public void setProcessSampleStartTime(long ms) {
         _processSampleStartTime = ms;
@@ -82,8 +84,6 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
         return _executeSampleStartTime;
     }
     
-    long _outAckVal = 0;
-    
     public void updateAckVal(long val) {
         _outAckVal = _outAckVal ^ val;
     }
@@ -91,140 +91,176 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
     public long getAckVal() {
         return _outAckVal;
     }
-
+    
+    /** Tuple APIs*/
+    @Override
     public int size() {
         return values.size();
     }
-    
+
+    @Override
     public int fieldIndex(String field) {
         return getFields().fieldIndex(field);
     }
     
+    @Override
     public boolean contains(String field) {
         return getFields().contains(field);
     }
     
+    @Override
     public Object getValue(int i) {
         return values.get(i);
     }
 
+    @Override
     public String getString(int i) {
         return (String) values.get(i);
     }
 
+    @Override
     public Integer getInteger(int i) {
         return (Integer) values.get(i);
     }
 
+    @Override
     public Long getLong(int i) {
         return (Long) values.get(i);
     }
 
+    @Override
     public Boolean getBoolean(int i) {
         return (Boolean) values.get(i);
     }
 
+    @Override
     public Short getShort(int i) {
         return (Short) values.get(i);
     }
 
+    @Override
     public Byte getByte(int i) {
         return (Byte) values.get(i);
     }
 
+    @Override
     public Double getDouble(int i) {
         return (Double) values.get(i);
     }
 
+    @Override
     public Float getFloat(int i) {
         return (Float) values.get(i);
     }
 
+    @Override
     public byte[] getBinary(int i) {
         return (byte[]) values.get(i);
     }
-    
-    
+
+    @Override
     public Object getValueByField(String field) {
         return values.get(fieldIndex(field));
     }
 
+    @Override
     public String getStringByField(String field) {
         return (String) values.get(fieldIndex(field));
     }
 
+    @Override
     public Integer getIntegerByField(String field) {
         return (Integer) values.get(fieldIndex(field));
     }
 
+    @Override
     public Long getLongByField(String field) {
         return (Long) values.get(fieldIndex(field));
     }
 
+    @Override
     public Boolean getBooleanByField(String field) {
         return (Boolean) values.get(fieldIndex(field));
     }
 
+    @Override
     public Short getShortByField(String field) {
         return (Short) values.get(fieldIndex(field));
     }
 
+    @Override
     public Byte getByteByField(String field) {
         return (Byte) values.get(fieldIndex(field));
     }
 
+    @Override
     public Double getDoubleByField(String field) {
         return (Double) values.get(fieldIndex(field));
     }
 
+    @Override
     public Float getFloatByField(String field) {
         return (Float) values.get(fieldIndex(field));
     }
 
+    @Override
     public byte[] getBinaryByField(String field) {
         return (byte[]) values.get(fieldIndex(field));
     }
-    
+
+    @Override
     public List<Object> getValues() {
         return values;
     }
-    
+
+    @Override    
     public Fields getFields() {
         return context.getComponentOutputFields(getSourceComponent(), getSourceStreamId());
     }
 
+    @Override
     public List<Object> select(Fields selector) {
         return getFields().select(selector, values);
     }
     
-    @Deprecated
+    @Override
     public GlobalStreamId getSourceGlobalStreamid() {
         return getSourceGlobalStreamId();
     }
-    
+
+    @Override
     public GlobalStreamId getSourceGlobalStreamId() {
         return new GlobalStreamId(getSourceComponent(), streamId);
     }
-    
+
+    @Override
     public String getSourceComponent() {
         return context.getComponentId(taskId);
     }
-    
+
+    @Override
     public int getSourceTask() {
         return taskId;
     }
-    
+
+    @Override
     public String getSourceStreamId() {
         return streamId;
     }
-    
+
+    @Override
     public MessageId getMessageId() {
         return id;
     }
     
     @Override
+    public GeneralTopologyContext getContext() {
+        return context;
+    }
+    
+    @Override
     public String toString() {
-        return "source: " + getSourceComponent() + ":" + taskId + ", stream: " + streamId + ", id: "+ id.toString() + ", " + values.toString();
+        return "source: " + getSourceComponent() + ":" + taskId + ", stream: " + streamId + ", id: "+ id.toString() + ", " + values.toString() + " PROC_START_TIME(sampled): " + _processSampleStartTime + " EXEC_START_TIME(sampled): " + _executeSampleStartTime;
     }
     
     @Override
@@ -236,121 +272,4 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
     public int hashCode() {
         return System.identityHashCode(this);
     }
-
-    private Keyword makeKeyword(String name) {
-        return Keyword.intern(Symbol.create(name));
-    }    
-
-    /* ILookup */
-    @Override
-    public Object valAt(Object o) {
-        try {
-            if(o instanceof Keyword) {
-                return getValueByField(((Keyword) o).getName());
-            } else if(o instanceof String) {
-                return getValueByField((String) o);
-            }
-        } catch(IllegalArgumentException ignored) {
-        }
-        return null;
-    }
-
-    /* Seqable */
-    public ISeq seq() {
-        if(values.size() > 0) {
-            return new Seq(getFields().toList(), values, 0);
-        }
-        return null;
-    }
-
-    static class Seq extends ASeq implements Counted {
-        final List<String> fields;
-        final List<Object> values;
-        final int i;
-
-        Seq(List<String> fields, List<Object> values, int i) {
-            this.fields = fields;
-            this.values = values;
-            assert i >= 0;
-            this.i = i;
-        }
-
-        public Seq(IPersistentMap meta, List<String> fields, List<Object> values, int i) {
-            super(meta);
-            this.fields= fields;
-            this.values = values;
-            assert i >= 0;
-            this.i = i;
-        }
-
-        public Object first() {
-            return new MapEntry(fields.get(i), values.get(i));
-        }
-
-        public ISeq next() {
-            if(i+1 < fields.size()) {
-                return new Seq(fields, values, i+1);
-            }
-            return null;
-        }
-
-        public int count() {
-            assert fields.size() -i >= 0 : "index out of bounds";
-            // i being the position in the fields of this seq, the remainder of the seq is the size
-            return fields.size() -i;
-        }
-
-        public Obj withMeta(IPersistentMap meta) {
-            return new Seq(meta, fields, values, i);
-        }
-    }
-
-    /* Indexed */
-    public Object nth(int i) {
-        if(i < values.size()) {
-            return values.get(i);
-        } else {
-            return null;
-        }
-    }
-
-    public Object nth(int i, Object notfound) {
-        Object ret = nth(i);
-        if(ret==null) ret = notfound;
-        return ret;
-    }
-
-    /* Counted */
-    public int count() {
-        return values.size();
-    }
-    
-    /* IMeta */
-    public IPersistentMap meta() {
-        if(_meta==null) {
-            _meta = new PersistentArrayMap( new Object[] {
-            makeKeyword("stream"), getSourceStreamId(), 
-            makeKeyword("component"), getSourceComponent(), 
-            makeKeyword("task"), getSourceTask()});
-        }
-        return _meta;
-    }
-
-    private PersistentArrayMap toMap() {
-        Object array[] = new Object[values.size()*2];
-        List<String> fields = getFields().toList();
-        for(int i=0; i < values.size(); i++) {
-            array[i*2] = fields.get(i);
-            array[(i*2)+1] = values.get(i);
-        }
-        return new PersistentArrayMap(array);
-    }
-
-    public IPersistentMap getMap() {
-        if(_map==null) {
-            setMap(toMap());
-        }
-        return _map;
-    }    
-    
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/utils/IndifferentAccessMap.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/IndifferentAccessMap.java b/storm-client/src/jvm/org/apache/storm/utils/IndifferentAccessMap.java
deleted file mode 100644
index 2675ab7..0000000
--- a/storm-client/src/jvm/org/apache/storm/utils/IndifferentAccessMap.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.utils;
-
-
-import clojure.lang.ILookup;
-import clojure.lang.ISeq;
-import clojure.lang.AFn;
-import clojure.lang.IPersistentMap;
-import clojure.lang.PersistentArrayMap;
-import clojure.lang.IMapEntry;
-import clojure.lang.IPersistentCollection;
-import clojure.lang.Keyword;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Collection;
-import java.util.Set;
-
-public class IndifferentAccessMap extends AFn implements ILookup, IPersistentMap, Map {
-
-    protected IPersistentMap _map;
-
-    protected IndifferentAccessMap() {
-    }
-
-    public IndifferentAccessMap(IPersistentMap map) {
-        setMap(map);
-    }
-
-    public IPersistentMap getMap() {
-        return _map;
-    }
-
-    public IPersistentMap setMap(IPersistentMap map) {
-        _map = map;
-        return _map;
-    }
-
-    public int size() {
-        return ((Map) getMap()).size();
-    }
-
-    public int count() {
-        return size();
-    }
-
-    public ISeq seq() {
-        return getMap().seq();
-    }
-
-    @Override
-    public Object valAt(Object o) {
-        if(o instanceof Keyword) {
-            return valAt(((Keyword) o).getName());
-        }
-        return getMap().valAt(o);
-    }
-    
-    @Override
-    public Object valAt(Object o, Object def) {
-        Object ret = valAt(o);
-        if(ret==null) ret = def;
-        return ret;
-    }
-
-    /* IFn */
-    @Override
-    public Object invoke(Object o) {
-        return valAt(o);
-    }
-
-    @Override
-    public Object invoke(Object o, Object notfound) {
-        return valAt(o, notfound);
-    }
-
-    /* IPersistentMap */
-    /* Naive implementation, but it might be good enough */
-    public IPersistentMap assoc(Object k, Object v) {
-        if(k instanceof Keyword) return assoc(((Keyword) k).getName(), v);
-        
-        return new IndifferentAccessMap(getMap().assoc(k, v));
-    }
-
-    public IPersistentMap assocEx(Object k, Object v) {
-        if(k instanceof Keyword) return assocEx(((Keyword) k).getName(), v);
-
-        return new IndifferentAccessMap(getMap().assocEx(k, v));
-    }
-
-    public IPersistentMap without(Object k) {
-        if(k instanceof Keyword) return without(((Keyword) k).getName());
-
-        return new IndifferentAccessMap(getMap().without(k));
-    }
-
-    public boolean containsKey(Object k) {
-        if(k instanceof Keyword) return containsKey(((Keyword) k).getName());
-        return getMap().containsKey(k);
-    }
-
-    public IMapEntry entryAt(Object k) {
-        if(k instanceof Keyword) return entryAt(((Keyword) k).getName());
-
-        return getMap().entryAt(k);
-    }
-
-    public IPersistentCollection cons(Object o) {
-        return getMap().cons(o);
-    }
-
-    public IPersistentCollection empty() {
-        return new IndifferentAccessMap(PersistentArrayMap.EMPTY);
-    }
-
-    public boolean equiv(Object o) {
-        return getMap().equiv(o);
-    }
-
-    public Iterator iterator() {
-        return getMap().iterator();
-    }
-
-    /* Map */
-    public boolean containsValue(Object v) {
-        return ((Map) getMap()).containsValue(v);
-    }
-
-    public Set entrySet() {
-        return ((Map) getMap()).entrySet();
-    }
-
-    public Object get(Object k) {
-        return valAt(k);
-    }
-
-    public boolean isEmpty() {
-        return ((Map) getMap()).isEmpty();
-    }
-
-    public Set keySet() {
-        return ((Map) getMap()).keySet();
-    }
-
-    public Collection values() {
-        return ((Map) getMap()).values();
-    }
-    
-    /* Not implemented */
-    public void clear() {
-        throw new UnsupportedOperationException();
-    }
-    public Object put(Object k, Object v) {
-        throw new UnsupportedOperationException();
-    }
-    public void putAll(Map m) {
-        throw new UnsupportedOperationException();
-    }
-    public Object remove(Object k) {
-        throw new UnsupportedOperationException();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index 771dc70..effaba2 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -18,7 +18,6 @@
 
 package org.apache.storm.utils;
 
-import clojure.lang.RT;
 import org.apache.storm.Config;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -864,15 +863,6 @@ public class Utils {
         return findAndReadConfigFile(name, true);
     }
 
-    public static synchronized clojure.lang.IFn loadClojureFn(String namespace, String name) {
-        try {
-            clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")"));
-        } catch (Exception e) {
-            //if playing from the repl and defining functions, file won't exist
-        }
-        return (clojure.lang.IFn) RT.var(namespace, name).deref();
-    }
-
     /**
      * "[[:a 1] [:b 1] [:c 2]} -> {1 [:a :b] 2 :c}"
      * Reverses an assoc-list style Map like reverseMap(Map...)

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/pom.xml
----------------------------------------------------------------------
diff --git a/storm-clojure/pom.xml b/storm-clojure/pom.xml
index cef8fd0..f525d01 100644
--- a/storm-clojure/pom.xml
+++ b/storm-clojure/pom.xml
@@ -27,6 +27,10 @@
 
     <artifactId>storm-clojure</artifactId>
 
+    <properties>
+        <argLine></argLine>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
@@ -43,6 +47,17 @@
             <artifactId>json-simple</artifactId>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>com.twitter</groupId>
+            <artifactId>carbonite</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-server</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build> 
@@ -55,15 +70,41 @@
                     <sourceDirectories>
                         <sourceDirectory>src/clj</sourceDirectory>
                     </sourceDirectories>
+                    <testSourceDirectories>
+                        <testSourceDirectory>test/clj</testSourceDirectory>
+                    </testSourceDirectories>
+                    <warnOnReflection>false</warnOnReflection>
+                    <copyDeclaredNamespaceOnly>true</copyDeclaredNamespaceOnly>
+                    <copiedNamespaces>
+                        <copiedNamespace>none</copiedNamespace>
+                    </copiedNamespaces>
                 </configuration>
                 <executions>
                     <execution>
-                        <id>compile</id>
+                        <id>compile-clojure</id>
                         <phase>compile</phase>
                         <goals>
                             <goal>compile</goal>
                         </goals>
                     </execution>
+                    <execution>
+                        <id>test-clojure</id>
+                        <phase>test</phase>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                        <configuration>
+                            <junitOutput>true</junitOutput>
+                            <testScript>./test/resources/test_runner.clj</testScript>
+                            <!-- argLine is set by JaCoCo for code coverage -->
+                            <vmargs>-Xmx1536m ${argLine} ${test.extra.args}</vmargs>
+                            <!-- Run clojure unit tests or all tests (including integration tests) depending on the profile enabled -->
+                            <testNamespaces>
+                                <testNamespace>${clojure.test.set}</testNamespace>
+                            </testNamespaces>
+                            <testDeclaredNamespaceOnly>${clojure.test.declared.namespace.only}</testDeclaredNamespaceOnly>
+                        </configuration>
+                    </execution>
                 </executions>
             </plugin>
         </plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureBolt.java
----------------------------------------------------------------------
diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureBolt.java b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureBolt.java
new file mode 100644
index 0000000..1e9b8ba
--- /dev/null
+++ b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureBolt.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.clojure;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.coordination.CoordinatedBolt.FinishedCallback;
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+
+import clojure.lang.IFn;
+import clojure.lang.Keyword;
+import clojure.lang.PersistentArrayMap;
+import clojure.lang.RT;
+import clojure.lang.Symbol;
+
+public class ClojureBolt implements IRichBolt, FinishedCallback {
+    Map<String, StreamInfo> _fields;
+    List<String> _fnSpec;
+    List<String> _confSpec;
+    List<Object> _params;
+
+    IBolt _bolt;
+
+    public ClojureBolt(List fnSpec, List confSpec, List<Object> params, Map<String, StreamInfo> fields) {
+        _fnSpec = fnSpec;
+        _confSpec = confSpec;
+        _params = params;
+        _fields = fields;
+    }
+
+    @Override
+    public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) {
+        IFn hof = ClojureUtil.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
+        try {
+            IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
+            final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] {
+                    Keyword.intern(Symbol.create("output-collector")), collector,
+                    Keyword.intern(Symbol.create("context")), context});
+            List<Object> args = new ArrayList<Object>() {{
+                add(stormConf);
+                add(context);
+                add(collectorMap);
+            }};
+
+            _bolt = (IBolt) preparer.applyTo(RT.seq(args));
+            //this is kind of unnecessary for clojure
+            try {
+                _bolt.prepare(stormConf, context, collector);
+            } catch(AbstractMethodError ame) {
+
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        _bolt.execute(new ClojureTuple(input));
+    }
+
+    @Override
+    public void cleanup() {
+        try {
+            _bolt.cleanup();
+        } catch(AbstractMethodError ame) {
+
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        for(String stream: _fields.keySet()) {
+            StreamInfo info = _fields.get(stream);
+            declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields()));
+        }
+    }
+
+    @Override
+    public void finishedId(Object id) {
+        if(_bolt instanceof FinishedCallback) {
+            ((FinishedCallback) _bolt).finishedId(id);
+        }
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        IFn hof = ClojureUtil.loadClojureFn(_confSpec.get(0), _confSpec.get(1));
+        try {
+            return (Map) hof.applyTo(RT.seq(_params));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSerializationRegister.java
----------------------------------------------------------------------
diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSerializationRegister.java b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSerializationRegister.java
new file mode 100644
index 0000000..df0840b
--- /dev/null
+++ b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSerializationRegister.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.clojure;
+
+import org.apache.storm.serialization.SerializationRegister;
+
+import com.esotericsoftware.kryo.Kryo;
+
+import carbonite.JavaBridge;
+
+public class ClojureSerializationRegister implements SerializationRegister {
+
+    @Override
+    public void register(Kryo kryo) throws Exception {
+        JavaBridge.registerPrimitives(kryo);
+        JavaBridge.registerCollections(kryo);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSpout.java
----------------------------------------------------------------------
diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSpout.java b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSpout.java
new file mode 100644
index 0000000..efa3dd6
--- /dev/null
+++ b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureSpout.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.clojure;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+
+import clojure.lang.IFn;
+import clojure.lang.Keyword;
+import clojure.lang.PersistentArrayMap;
+import clojure.lang.RT;
+import clojure.lang.Symbol;
+
+public class ClojureSpout implements IRichSpout {
+    Map<String, StreamInfo> _fields;
+    List<String> _fnSpec;
+    List<String> _confSpec;
+    List<Object> _params;
+    
+    ISpout _spout;
+    
+    public ClojureSpout(List fnSpec, List confSpec, List<Object> params, Map<String, StreamInfo> fields) {
+        _fnSpec = fnSpec;
+        _confSpec = confSpec;
+        _params = params;
+        _fields = fields;
+    }
+    
+
+    @Override
+    public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+        IFn hof = ClojureUtil.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
+        try {
+            IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
+            final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] {
+                Keyword.intern(Symbol.create("output-collector")), collector,
+                Keyword.intern(Symbol.create("context")), context});
+            List<Object> args = new ArrayList<Object>() {{
+                add(conf);
+                add(context);
+                add(collectorMap);
+            }};
+            
+            _spout = (ISpout) preparer.applyTo(RT.seq(args));
+            //this is kind of unnecessary for clojure
+            try {
+                _spout.open(conf, context, collector);
+            } catch(AbstractMethodError ame) {
+                
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            _spout.close();
+        } catch(AbstractMethodError ame) {
+                
+        }
+    }
+
+    @Override
+    public void nextTuple() {
+        try {
+            _spout.nextTuple();
+        } catch(AbstractMethodError ame) {
+                
+        }
+
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        try {
+            _spout.ack(msgId);
+        } catch(AbstractMethodError ame) {
+                
+        }
+
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        try {
+            _spout.fail(msgId);
+        } catch(AbstractMethodError ame) {
+                
+        }
+
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        for(String stream: _fields.keySet()) {
+            StreamInfo info = _fields.get(stream);
+            declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields()));
+        }
+    }
+    
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        IFn hof = ClojureUtil.loadClojureFn(_confSpec.get(0), _confSpec.get(1));
+        try {
+            return (Map) hof.applyTo(RT.seq(_params));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void activate() {
+        try {
+            _spout.activate();
+        } catch(AbstractMethodError ame) {
+                
+        }
+    }
+
+    @Override
+    public void deactivate() {
+        try {
+            _spout.deactivate();
+        } catch(AbstractMethodError ame) {
+                
+        }
+    }
+}


[6/6] storm git commit: Added STORM-2468 to Changelog

Posted by bo...@apache.org.
Added STORM-2468 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2ec5581b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2ec5581b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2ec5581b

Branch: refs/heads/master
Commit: 2ec5581bb904971cee671cd1cd9b89495fca6636
Parents: e6a7e26
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Thu Apr 27 20:43:44 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Thu Apr 27 20:43:44 2017 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2ec5581b/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 556f591..f7d935f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 \ufeff## 2.0.0
+ * STORM-2468: Remove clojure from storm-client
  * STORM-2490: Lambda support
  * STORM-2349: Add one RocketMQ plugin for the Apache Storm
  * STORM-2481: Upgrade Aether version to resolve Aether bug BUG-451566


[5/6] storm git commit: Merge branch 'STORM-2468' of https://github.com/revans2/incubator-storm into STORM-2468

Posted by bo...@apache.org.
Merge branch 'STORM-2468' of https://github.com/revans2/incubator-storm into STORM-2468

STORM-2468: Remove clojure from storm-client


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e6a7e269
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e6a7e269
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e6a7e269

Branch: refs/heads/master
Commit: e6a7e269d55fa9aea4e3994caaeb847824ef6c0d
Parents: de2907f 8e1128a
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Thu Apr 27 19:11:51 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Thu Apr 27 19:11:51 2017 -0500

----------------------------------------------------------------------
 docs/Serialization.md                           |  10 +-
 .../org/apache/storm/starter/clj/bolts_test.clj |  26 +-
 storm-client/pom.xml                            |  11 -
 .../org/apache/storm/clojure/ClojureBolt.java   | 120 -----
 .../org/apache/storm/clojure/ClojureSpout.java  | 153 -------
 .../org/apache/storm/clojure/RichShellBolt.java |  51 ---
 .../apache/storm/clojure/RichShellSpout.java    |  51 ---
 .../org/apache/storm/cluster/IStateStorage.java |   1 -
 .../storm/cluster/StormClusterStateImpl.java    |   6 -
 .../storm/executor/bolt/BoltExecutor.java       |   4 +-
 .../executor/bolt/BoltOutputCollectorImpl.java  |   9 +-
 .../storm/metric/api/IMetricsConsumer.java      |   8 +-
 .../serialization/SerializationFactory.java     |  17 +-
 .../serialization/SerializationRegister.java    |  35 ++
 .../jvm/org/apache/storm/spout/ShellSpout.java  |   6 +-
 .../jvm/org/apache/storm/stats/StatsUtil.java   |  52 +--
 .../jvm/org/apache/storm/task/ShellBolt.java    |   6 +-
 .../storm/testing/KeyedSummingBatchBolt.java    |   3 +-
 .../storm/trident/operation/builtin/Sum.java    |  24 +-
 .../storm/trident/tuple/TridentTupleView.java   |  21 +-
 .../src/jvm/org/apache/storm/tuple/Tuple.java   |   6 +
 .../jvm/org/apache/storm/tuple/TupleImpl.java   | 235 ++++------
 .../storm/utils/IndifferentAccessMap.java       | 177 --------
 .../src/jvm/org/apache/storm/utils/Utils.java   |  10 -
 storm-clojure/pom.xml                           |  43 +-
 .../org/apache/storm/clojure/ClojureBolt.java   | 119 +++++
 .../clojure/ClojureSerializationRegister.java   |  33 ++
 .../org/apache/storm/clojure/ClojureSpout.java  | 154 +++++++
 .../org/apache/storm/clojure/ClojureTuple.java  | 439 +++++++++++++++++++
 .../org/apache/storm/clojure/ClojureUtil.java   |  31 ++
 .../storm/clojure/IndifferentAccessMap.java     | 176 ++++++++
 .../org/apache/storm/clojure/RichShellBolt.java |  51 +++
 .../apache/storm/clojure/RichShellSpout.java    |  51 +++
 ...he.storm.serialization.SerializationRegister |  17 +
 .../test/clj/org/apache/storm/tuple_test.clj    |  53 +++
 storm-clojure/test/resources/log4j2-test.xml    |  32 ++
 storm-clojure/test/resources/test_runner.clj    | 114 +++++
 storm-core/pom.xml                              | 193 +-------
 storm-core/src/clj/org/apache/storm/config.clj  |  37 --
 .../clj/org/apache/storm/daemon/logviewer.clj   |   2 +-
 .../src/clj/org/apache/storm/daemon_config.clj  |  30 ++
 .../clj/org/apache/storm/internal/clojure.clj   | 204 ---------
 storm-core/src/clj/org/apache/storm/log.clj     |  34 --
 storm-core/src/clj/org/apache/storm/ui/core.clj |   2 +-
 storm-core/src/clj/org/apache/storm/util.clj    | 134 ------
 .../org/apache/storm/integration_test.clj       |   2 +-
 .../org/apache/storm/testing4j_test.clj         |   4 +-
 .../test/clj/org/apache/storm/drpc_test.clj     |   2 +-
 .../test/clj/org/apache/storm/grouping_test.clj |   2 +-
 .../clj/org/apache/storm/logviewer_test.clj     |   2 +-
 .../test/clj/org/apache/storm/metrics_test.clj  |   2 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   |   2 +-
 .../scheduler/multitenant_scheduler_test.clj    |   2 +-
 .../apache/storm/security/auth/auth_test.clj    |   2 +-
 .../storm/security/auth/nimbus_auth_test.clj    |   2 +-
 .../clj/org/apache/storm/transactional_test.clj |   2 +-
 .../test/clj/org/apache/storm/tuple_test.clj    |  52 ---
 storm-core/test/resources/test_runner.clj       | 114 -----
 58 files changed, 1578 insertions(+), 1603 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e6a7e269/storm-client/pom.xml
----------------------------------------------------------------------


[2/6] storm git commit: STORM-2468: Remove clojure from storm-client

Posted by bo...@apache.org.
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureTuple.java
----------------------------------------------------------------------
diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureTuple.java b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureTuple.java
new file mode 100644
index 0000000..8de15eb
--- /dev/null
+++ b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureTuple.java
@@ -0,0 +1,439 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.clojure;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+
+import clojure.lang.AFn;
+import clojure.lang.ASeq;
+import clojure.lang.ArityException;
+import clojure.lang.Counted;
+import clojure.lang.IFn;
+import clojure.lang.ILookup;
+import clojure.lang.IMapEntry;
+import clojure.lang.IMeta;
+import clojure.lang.IPersistentCollection;
+import clojure.lang.IPersistentMap;
+import clojure.lang.ISeq;
+import clojure.lang.Indexed;
+import clojure.lang.Keyword;
+import clojure.lang.MapEntry;
+import clojure.lang.Obj;
+import clojure.lang.PersistentArrayMap;
+import clojure.lang.Seqable;
+import clojure.lang.Symbol;
+
+public class ClojureTuple extends TupleImpl implements Seqable, Indexed, IMeta, ILookup, IPersistentMap, Map, IFn {
+    private IPersistentMap _meta;
+    private IPersistentMap _map;
+
+    public ClojureTuple(Tuple t) {
+        super(t);
+    }
+
+    private Keyword makeKeyword(String name) {
+        return Keyword.intern(Symbol.create(name));
+    }
+
+    private PersistentArrayMap toMap() {
+        Object array[] = new Object[size()*2];
+        List<String> fields = getFields().toList();
+        for(int i=0; i < size(); i++) {
+            array[i*2] = fields.get(i);
+            array[(i*2)+1] = getValue(i);
+        }
+        return new PersistentArrayMap(array);
+    }
+
+    public IPersistentMap getMap() {
+        if (_map == null) {
+            _map = toMap();
+        }
+        return _map;
+    }
+
+    /* ILookup */
+    @Override
+    public Object valAt(Object o) {
+        try {
+            if(o instanceof Keyword) {
+                return getValueByField(((Keyword) o).getName());
+            } else if(o instanceof String) {
+                return getValueByField((String) o);
+            }
+        } catch(IllegalArgumentException ignored) {
+        }
+        return null;
+    }
+
+    @Override
+    public Object valAt(Object o, Object def) {
+        Object ret = valAt(o);
+        if (ret==null) ret = def;
+        return ret;
+    }
+
+    /* Seqable */
+    @Override
+    public ISeq seq() {
+        if(size() > 0) {
+            return new Seq(getFields().toList(), getValues(), 0);
+        }
+        return null;
+    }
+
+    static class Seq extends ASeq implements Counted {
+        private static final long serialVersionUID = 1L;
+        final List<String> fields;
+        final List<Object> values;
+        final int i;
+
+        Seq(List<String> fields, List<Object> values, int i) {
+            this.fields = fields;
+            this.values = values;
+            assert i >= 0;
+            this.i = i;
+        }
+
+        public Seq(IPersistentMap meta, List<String> fields, List<Object> values, int i) {
+            super(meta);
+            this.fields= fields;
+            this.values = values;
+            assert i >= 0;
+            this.i = i;
+        }
+
+        @Override
+        public Object first() {
+            return new MapEntry(fields.get(i), values.get(i));
+        }
+
+        @Override
+        public ISeq next() {
+            if(i+1 < fields.size()) {
+                return new Seq(fields, values, i+1);
+            }
+            return null;
+        }
+
+        @Override
+        public int count() {
+            assert fields.size() -i >= 0 : "index out of bounds";
+            // i being the position in the fields of this seq, the remainder of the seq is the size
+            return fields.size() -i;
+        }
+
+        @Override
+        public Obj withMeta(IPersistentMap meta) {
+            return new Seq(meta, fields, values, i);
+        }
+    }
+
+    /* Indexed */
+    public Object nth(int i) {
+        if (i < size()) {
+            return getValue(i);
+        } else {
+            return null;
+        }
+    }
+
+    public Object nth(int i, Object notfound) {
+        Object ret = nth(i);
+        if (ret==null) ret = notfound;
+        return ret;
+    }
+
+    /* Counted */
+    public int count() {
+        return size();
+    }
+
+    /* IMeta */
+    public IPersistentMap meta() {
+        if(_meta==null) {
+            _meta = new PersistentArrayMap( new Object[] {
+                    makeKeyword("stream"), getSourceStreamId(),
+                    makeKeyword("component"), getSourceComponent(),
+                    makeKeyword("task"), getSourceTask()});
+        }
+        return _meta;
+    }
+
+    /* IFn */
+    @Override
+    public Object invoke(Object o) {
+        return valAt(o);
+    }
+
+    @Override
+    public Object invoke(Object o, Object notfound) {
+        return valAt(o, notfound);
+    }
+
+    @Override
+    public Object invoke() {
+        throw new ArityException(0, "1 or 2 args only");
+    }
+
+    @Override
+    public Object invoke(Object arg1, Object arg2, Object arg3) {
+        throw new ArityException(3, "1 or 2 args only");
+    }
+
+    @Override
+    public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4) {
+        throw new ArityException(4, "1 or 2 args only");
+    }
+
+    @Override
+    public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5) {
+        throw new ArityException(5, "1 or 2 args only");
+    }
+
+    @Override
+    public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6) {
+        throw new ArityException(6, "1 or 2 args only");
+    }
+
+    @Override
+    public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7) {
+        throw new ArityException(7, "1 or 2 args only");
+    }
+
+    @Override
+    public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+            Object arg8) {
+        throw new ArityException(8, "1 or 2 args only");
+    }
+
+    @Override
+    public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+            Object arg8, Object arg9) {
+        throw new ArityException(9, "1 or 2 args only");
+    }
+
+    @Override
+    public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+            Object arg8, Object arg9, Object arg10) {
+        throw new ArityException(10, "1 or 2 args only");
+    }
+
+    @Override
+    public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+            Object arg8, Object arg9, Object arg10, Object arg11) {
+        throw new ArityException(11, "1 or 2 args only");
+    }
+
+    @Override
+    public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+            Object arg8, Object arg9, Object arg10, Object arg11, Object arg12) {
+        throw new ArityException(12, "1 or 2 args only");
+    }
+
+    @Override
+    public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+            Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13) {
+        throw new ArityException(13, "1 or 2 args only");
+    }
+
+    @Override
+    public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+            Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14) {
+        throw new ArityException(14, "1 or 2 args only");
+    }
+
+    @Override
+    public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+            Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14,
+            Object arg15) {
+        throw new ArityException(15, "1 or 2 args only");
+    }
+
+    @Override
+    public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+            Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14,
+            Object arg15, Object arg16) {
+        throw new ArityException(16, "1 or 2 args only");
+    }
+
+    @Override
+    public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+            Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14,
+            Object arg15, Object arg16, Object arg17) {
+        throw new ArityException(17, "1 or 2 args only");
+    }
+
+    @Override
+    public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+            Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14,
+            Object arg15, Object arg16, Object arg17, Object arg18) {
+        throw new ArityException(18, "1 or 2 args only");
+    }
+
+    @Override
+    public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+            Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14,
+            Object arg15, Object arg16, Object arg17, Object arg18, Object arg19) {
+        throw new ArityException(19, "1 or 2 args only");
+    }
+
+    @Override
+    public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+            Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14,
+            Object arg15, Object arg16, Object arg17, Object arg18, Object arg19, Object arg20) {
+        throw new ArityException(20, "1 or 2 args only");
+    }
+
+    @Override
+    public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+            Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14,
+            Object arg15, Object arg16, Object arg17, Object arg18, Object arg19, Object arg20, Object... args) {
+        throw new ArityException(21, "1 or 2 args only");
+    }
+
+    @Override
+    public Object applyTo(ISeq arglist) {
+        return AFn.applyToHelper(this, arglist);
+    }
+
+    @Override
+    public Object call() throws Exception {
+        return invoke();
+    }
+
+    @Override
+    public void run() {
+        invoke();
+    }
+
+    /* IPersistentMap */
+    /* Naive implementation, but it might be good enough */
+    @Override
+    public IPersistentMap assoc(Object k, Object v) {
+        if(k instanceof Keyword) return assoc(((Keyword) k).getName(), v);
+        
+        return new IndifferentAccessMap(getMap().assoc(k, v));
+    }
+
+    @Override
+    public IPersistentMap assocEx(Object k, Object v) {
+        if(k instanceof Keyword) return assocEx(((Keyword) k).getName(), v);
+
+        return new IndifferentAccessMap(getMap().assocEx(k, v));
+    }
+
+    @Override
+    public IPersistentMap without(Object k) {
+        if(k instanceof Keyword) return without(((Keyword) k).getName());
+
+        return new IndifferentAccessMap(getMap().without(k));
+    }
+
+    @Override
+    public boolean containsKey(Object k) {
+        if(k instanceof Keyword) return containsKey(((Keyword) k).getName());
+        return getMap().containsKey(k);
+    }
+
+    @Override
+    public IMapEntry entryAt(Object k) {
+        if(k instanceof Keyword) return entryAt(((Keyword) k).getName());
+
+        return getMap().entryAt(k);
+    }
+
+    @Override
+    public IPersistentCollection cons(Object o) {
+        return getMap().cons(o);
+    }
+
+    @Override
+    public IPersistentCollection empty() {
+        return new IndifferentAccessMap(PersistentArrayMap.EMPTY);
+    }
+
+    @Override
+    public boolean equiv(Object o) {
+        return getMap().equiv(o);
+    }
+
+    @Override
+    public Iterator iterator() {
+        return getMap().iterator();
+    }
+
+    /* Map */
+    @Override
+    public boolean containsValue(Object v) {
+        return ((Map) getMap()).containsValue(v);
+    }
+
+    @Override
+    public Set entrySet() {
+        return ((Map) getMap()).entrySet();
+    }
+
+    @Override
+    public Object get(Object k) {
+        return valAt(k);
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return ((Map) getMap()).isEmpty();
+    }
+
+    @Override
+    public Set keySet() {
+        return ((Map) getMap()).keySet();
+    }
+
+    @Override
+    public Collection values() {
+        return ((Map) getMap()).values();
+    }
+
+    /* Not implemented */
+    @Override
+    public void clear() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Object put(Object k, Object v) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void putAll(Map m) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Object remove(Object k) {
+        throw new UnsupportedOperationException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureUtil.java
----------------------------------------------------------------------
diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureUtil.java b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureUtil.java
new file mode 100644
index 0000000..0bfe9fb
--- /dev/null
+++ b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureUtil.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.clojure;
+
+import clojure.lang.RT;
+
+public class ClojureUtil {
+    public static synchronized clojure.lang.IFn loadClojureFn(String namespace, String name) {
+        try {
+            clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")"));
+        } catch (Exception e) {
+            //if playing from the repl and defining functions, file won't exist
+        }
+        return (clojure.lang.IFn) RT.var(namespace, name).deref();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/src/main/java/org/apache/storm/clojure/IndifferentAccessMap.java
----------------------------------------------------------------------
diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/IndifferentAccessMap.java b/storm-clojure/src/main/java/org/apache/storm/clojure/IndifferentAccessMap.java
new file mode 100644
index 0000000..b30788a
--- /dev/null
+++ b/storm-clojure/src/main/java/org/apache/storm/clojure/IndifferentAccessMap.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.clojure;
+
+import clojure.lang.ILookup;
+import clojure.lang.ISeq;
+import clojure.lang.AFn;
+import clojure.lang.IPersistentMap;
+import clojure.lang.PersistentArrayMap;
+import clojure.lang.IMapEntry;
+import clojure.lang.IPersistentCollection;
+import clojure.lang.Keyword;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Collection;
+import java.util.Set;
+
+public class IndifferentAccessMap extends AFn implements ILookup, IPersistentMap, Map {
+
+    protected IPersistentMap _map;
+
+    protected IndifferentAccessMap() {
+    }
+
+    public IndifferentAccessMap(IPersistentMap map) {
+        setMap(map);
+    }
+
+    public IPersistentMap getMap() {
+        return _map;
+    }
+
+    public IPersistentMap setMap(IPersistentMap map) {
+        _map = map;
+        return _map;
+    }
+
+    public int size() {
+        return ((Map) getMap()).size();
+    }
+
+    public int count() {
+        return size();
+    }
+
+    public ISeq seq() {
+        return getMap().seq();
+    }
+
+    @Override
+    public Object valAt(Object o) {
+        if(o instanceof Keyword) {
+            return valAt(((Keyword) o).getName());
+        }
+        return getMap().valAt(o);
+    }
+    
+    @Override
+    public Object valAt(Object o, Object def) {
+        Object ret = valAt(o);
+        if(ret==null) ret = def;
+        return ret;
+    }
+
+    /* IFn */
+    @Override
+    public Object invoke(Object o) {
+        return valAt(o);
+    }
+
+    @Override
+    public Object invoke(Object o, Object notfound) {
+        return valAt(o, notfound);
+    }
+
+    /* IPersistentMap */
+    /* Naive implementation, but it might be good enough */
+    public IPersistentMap assoc(Object k, Object v) {
+        if(k instanceof Keyword) return assoc(((Keyword) k).getName(), v);
+        
+        return new IndifferentAccessMap(getMap().assoc(k, v));
+    }
+
+    public IPersistentMap assocEx(Object k, Object v) {
+        if(k instanceof Keyword) return assocEx(((Keyword) k).getName(), v);
+
+        return new IndifferentAccessMap(getMap().assocEx(k, v));
+    }
+
+    public IPersistentMap without(Object k) {
+        if(k instanceof Keyword) return without(((Keyword) k).getName());
+
+        return new IndifferentAccessMap(getMap().without(k));
+    }
+
+    public boolean containsKey(Object k) {
+        if(k instanceof Keyword) return containsKey(((Keyword) k).getName());
+        return getMap().containsKey(k);
+    }
+
+    public IMapEntry entryAt(Object k) {
+        if(k instanceof Keyword) return entryAt(((Keyword) k).getName());
+
+        return getMap().entryAt(k);
+    }
+
+    public IPersistentCollection cons(Object o) {
+        return getMap().cons(o);
+    }
+
+    public IPersistentCollection empty() {
+        return new IndifferentAccessMap(PersistentArrayMap.EMPTY);
+    }
+
+    public boolean equiv(Object o) {
+        return getMap().equiv(o);
+    }
+
+    public Iterator iterator() {
+        return getMap().iterator();
+    }
+
+    /* Map */
+    public boolean containsValue(Object v) {
+        return ((Map) getMap()).containsValue(v);
+    }
+
+    public Set entrySet() {
+        return ((Map) getMap()).entrySet();
+    }
+
+    public Object get(Object k) {
+        return valAt(k);
+    }
+
+    public boolean isEmpty() {
+        return ((Map) getMap()).isEmpty();
+    }
+
+    public Set keySet() {
+        return ((Map) getMap()).keySet();
+    }
+
+    public Collection values() {
+        return ((Map) getMap()).values();
+    }
+    
+    /* Not implemented */
+    public void clear() {
+        throw new UnsupportedOperationException();
+    }
+    public Object put(Object k, Object v) {
+        throw new UnsupportedOperationException();
+    }
+    public void putAll(Map m) {
+        throw new UnsupportedOperationException();
+    }
+    public Object remove(Object k) {
+        throw new UnsupportedOperationException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/src/main/java/org/apache/storm/clojure/RichShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/RichShellBolt.java b/storm-clojure/src/main/java/org/apache/storm/clojure/RichShellBolt.java
new file mode 100644
index 0000000..6de5637
--- /dev/null
+++ b/storm-clojure/src/main/java/org/apache/storm/clojure/RichShellBolt.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.clojure;
+
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.task.ShellBolt;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import java.util.Map;
+
+public class RichShellBolt extends ShellBolt implements IRichBolt {
+    private Map<String, StreamInfo> _outputs;
+    
+    public RichShellBolt(String[] command, Map<String, StreamInfo> outputs) {
+        super(command);
+        _outputs = outputs;
+    }
+    
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        for(String stream: _outputs.keySet()) {
+            StreamInfo def = _outputs.get(stream);
+            if(def.is_direct()) {
+                declarer.declareStream(stream, true, new Fields(def.get_output_fields()));
+            } else {
+                declarer.declareStream(stream, new Fields(def.get_output_fields()));                
+            }
+        }
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/src/main/java/org/apache/storm/clojure/RichShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/RichShellSpout.java b/storm-clojure/src/main/java/org/apache/storm/clojure/RichShellSpout.java
new file mode 100644
index 0000000..9fb7e73
--- /dev/null
+++ b/storm-clojure/src/main/java/org/apache/storm/clojure/RichShellSpout.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.clojure;
+
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.spout.ShellSpout;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import java.util.Map;
+
+public class RichShellSpout extends ShellSpout implements IRichSpout {
+    private Map<String, StreamInfo> _outputs;
+
+    public RichShellSpout(String[] command, Map<String, StreamInfo> outputs) {
+        super(command);
+        _outputs = outputs;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        for(String stream: _outputs.keySet()) {
+            StreamInfo def = _outputs.get(stream);
+            if(def.is_direct()) {
+                declarer.declareStream(stream, true, new Fields(def.get_output_fields()));
+            } else {
+                declarer.declareStream(stream, new Fields(def.get_output_fields()));
+            }
+        }
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/src/main/resources/META-INF/services/org.apache.storm.serialization.SerializationRegister
----------------------------------------------------------------------
diff --git a/storm-clojure/src/main/resources/META-INF/services/org.apache.storm.serialization.SerializationRegister b/storm-clojure/src/main/resources/META-INF/services/org.apache.storm.serialization.SerializationRegister
new file mode 100644
index 0000000..cb66fd0
--- /dev/null
+++ b/storm-clojure/src/main/resources/META-INF/services/org.apache.storm.serialization.SerializationRegister
@@ -0,0 +1 @@
+org.apache.storm.clojure.ClojureSerializationRegister

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/test/clj/org/apache/storm/tuple_test.clj
----------------------------------------------------------------------
diff --git a/storm-clojure/test/clj/org/apache/storm/tuple_test.clj b/storm-clojure/test/clj/org/apache/storm/tuple_test.clj
new file mode 100644
index 0000000..a67022a
--- /dev/null
+++ b/storm-clojure/test/clj/org/apache/storm/tuple_test.clj
@@ -0,0 +1,53 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.tuple-test
+  (:use [clojure test])
+  (:import [org.apache.storm Testing])
+  (:import [org.apache.storm.testing MkTupleParam])
+  (:import [org.apache.storm.tuple Tuple])
+  (:import [org.apache.storm.clojure ClojureTuple]))
+
+(deftest test-lookup
+  (let [ tuple (ClojureTuple. (Testing/testTuple [12 "hello"] (doto (MkTupleParam. ) (.setFieldsList ["foo" "bar"])))) ]
+    (is (= 12 (tuple "foo")))
+    (is (= 12 (tuple :foo)))
+    (is (= 12 (:foo tuple)))
+
+    (is (= "hello" (:bar tuple)))
+    
+    (is (= :notfound (tuple "404" :notfound)))))
+
+(deftest test-indexed
+  (let [ tuple (ClojureTuple. (Testing/testTuple [12 "hello"] (doto (MkTupleParam. ) (.setFieldsList ["foo" "bar"])))) ]
+    (is (= 12 (nth tuple 0)))
+    (is (= "hello" (nth tuple 1)))))
+
+(deftest test-seq
+  (let [ tuple (ClojureTuple. (Testing/testTuple [12 "hello"] (doto (MkTupleParam. ) (.setFieldsList ["foo" "bar"])))) ]
+    (is (= [["foo" 12] ["bar" "hello"]] (seq tuple)))))
+
+(deftest test-map
+    (let [tuple (ClojureTuple. (Testing/testTuple [12 "hello"] (doto (MkTupleParam. ) (.setFieldsList ["foo" "bar"])))) ]
+      (is (= {"foo" 42 "bar" "hello"} (.getMap (assoc tuple "foo" 42))))
+      (is (= {"foo" 42 "bar" "hello"} (.getMap (assoc tuple :foo 42))))
+
+      (is (= {"bar" "hello"} (.getMap (dissoc tuple "foo"))))
+      (is (= {"bar" "hello"} (.getMap (dissoc tuple :foo))))
+
+      (is (= {"foo" 42 "bar" "world"} (.getMap (assoc 
+                                        (assoc tuple "foo" 42)
+                                        :bar "world"))))))
+

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/test/resources/log4j2-test.xml
----------------------------------------------------------------------
diff --git a/storm-clojure/test/resources/log4j2-test.xml b/storm-clojure/test/resources/log4j2-test.xml
new file mode 100644
index 0000000..e8ae19e
--- /dev/null
+++ b/storm-clojure/test/resources/log4j2-test.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<configuration monitorInterval="60">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT" follow="true">
+            <PatternLayout pattern="%-4r [%t] %-5p %c{1.} - %msg%n"/>
+        </Console>
+    </Appenders>
+    <Loggers>
+        <Logger name="org.apache.zookeeper" level="WARN"/>
+        <Root level="${env:LOG_LEVEL:-INFO}">
+            <AppenderRef ref="Console"/>
+        </Root>
+    </Loggers>
+</configuration>
+

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/test/resources/test_runner.clj
----------------------------------------------------------------------
diff --git a/storm-clojure/test/resources/test_runner.clj b/storm-clojure/test/resources/test_runner.clj
new file mode 100644
index 0000000..c10fec3
--- /dev/null
+++ b/storm-clojure/test/resources/test_runner.clj
@@ -0,0 +1,114 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.testrunner)
+
+(import `java.util.Properties)
+(import `java.io.ByteArrayOutputStream)
+(import `java.io.FileInputStream)
+(import `java.io.FileOutputStream)
+(import `java.io.FileWriter)
+(import `java.io.File)
+(import `java.io.OutputStream)
+(import `java.io.OutputStreamWriter)
+(import `java.io.PrintStream)
+(import `java.io.PrintWriter)
+(use 'clojure.test)
+(use 'clojure.test.junit)
+
+(def props (Properties.))
+(.load props (FileInputStream. (first *command-line-args*)))
+
+(def namespaces  (into [] 
+                       (for [[key val] props
+                             :when (.startsWith key "ns.")]
+                               (symbol val))))
+
+(def output-dir (.get props "outputDir"))
+
+(dorun (for [ns namespaces]
+  (require ns)))
+
+(.mkdirs (File. output-dir))
+
+(let [sys-out System/out
+      sys-err System/err
+      num-bad (atom 0)
+      original-junit-report junit-report
+      orig-out *out*]
+  (dorun (for [ns namespaces]
+    (with-open [out-stream (FileOutputStream. (str output-dir "/" ns ".xml"))
+                print-writer (PrintWriter. out-stream true)
+                print-stream (PrintStream. out-stream true)]
+      (.println sys-out (str "Running " ns))
+      (try
+        (let [in-sys-out (atom false)]
+        (binding [*test-out* print-writer
+                  *out* orig-out
+                  junit-report (fn [data]
+                                   (let [type (data :type)]
+                                     (cond
+                                       (= type :begin-test-var) (do
+                                                                  (when @in-sys-out
+                                                                    (reset! in-sys-out false)
+                                                                    (System/setOut sys-out)
+                                                                    (System/setErr sys-err)
+                                                                    (set! *out* orig-out)
+                                                                    (with-test-out
+                                                                      (print "]]>")
+                                                                      (finish-element 'system-out true)))
+                                                                   (original-junit-report data)
+                                                                   (reset! in-sys-out true)
+                                                                   (with-test-out
+                                                                     (start-element 'system-out true)
+                                                                     (print "<![CDATA[") (flush))
+                                                                   (System/setOut print-stream)
+                                                                   (System/setErr print-stream)
+                                                                   (set! *out* print-writer))
+                                       (= type :end-test-var) (when @in-sys-out
+                                                                (reset! in-sys-out false)
+                                                                (System/setOut sys-out)
+                                                                (System/setErr sys-err)
+                                                                (set! *out* orig-out)
+                                                                (with-test-out
+                                                                  (print "]]>")
+                                                                  (finish-element 'system-out true)))
+                                       (= type :fail) (when @in-sys-out
+                                                                (reset! in-sys-out false)
+                                                                (System/setOut sys-out)
+                                                                (System/setErr sys-err)
+                                                                (set! *out* orig-out)
+                                                                (with-test-out
+                                                                  (print "]]>")
+                                                                  (finish-element 'system-out true)))
+                                       (= type :error) (when @in-sys-out
+                                                                (reset! in-sys-out false)
+                                                                (System/setOut sys-out)
+                                                                (System/setErr sys-err)
+                                                                (set! *out* orig-out)
+                                                                (with-test-out
+                                                                  (print "]]>")
+                                                                  (finish-element 'system-out true))))
+                                     (if (not (= type :begin-test-var)) (original-junit-report data))))]
+          (with-junit-output
+            (let [result (run-tests ns)]
+               (.println sys-out (str "Tests run: " (result :test) ", Passed: " (result :pass) ", Failures: " (result :fail) ", Errors: " (result :error)))
+               (reset! num-bad (+ @num-bad (result :error) (result :fail)))))))
+        (finally 
+          (System/setOut sys-out)
+          (System/setErr sys-err))))))
+  (shutdown-agents)
+  (System/exit (if (> @num-bad 0) 1 0)))

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 0014aa5..e9adaf8 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -38,6 +38,12 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.storm</groupId>
+            <artifactId>storm-clojure</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.storm</groupId>
             <artifactId>storm-server</artifactId>
             <version>${project.version}</version>
         </dependency>
@@ -282,9 +288,6 @@
             <groupId>metrics-clojure</groupId>
             <artifactId>metrics-clojure</artifactId>
         </dependency>
-        <!-- hamcrest-core dependency is shaded inside the mockito-all and junit depends on newer version of hamcrest-core.
-        To give higher precedence to classes from newer version of hamcrest-core, Junit has been placed above mockito.
-         -->
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
@@ -448,7 +451,7 @@
                         </goals>
                         <configuration>
                             <junitOutput>true</junitOutput>
-                            <testScript>test/resources/test_runner.clj</testScript>
+                            <testScript>../storm-clojure/test/resources/test_runner.clj</testScript>
                             <!-- argLine is set by JaCoCo for code coverage -->
                             <vmargs>-Xmx1536m ${argLine} ${test.extra.args}</vmargs>
                             <!-- Run clojure unit tests or all tests (including integration tests) depending on the profile enabled -->
@@ -477,188 +480,6 @@
                 </configuration>
             </plugin>
             <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-shade-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>shade</goal>
-                        </goals>
-                    </execution>
-                </executions>
-                <configuration>
-                    <keepDependenciesWithProvidedScope>false</keepDependenciesWithProvidedScope>
-                    <promoteTransitiveDependencies>false</promoteTransitiveDependencies>
-                    <createDependencyReducedPom>true</createDependencyReducedPom>
-                    <minimizeJar>false</minimizeJar>
-                    <artifactSet>
-                        <includes>
-                            <include>ns-tracker:ns-tracker</include>
-                            <include>hiccup:hiccup</include>
-                            <include>ring:*</include>
-                            <include>compojure:compojure</include>
-                            <include>clj-time:clj-time</include>
-                            <include>org.clojure:math.numeric-tower</include>
-                            <include>org.clojure:tools.cli</include>
-                            <include>org.clojure:tools.logging</include>
-                            <include>org.clojure:tools.macro</include>
-                            <include>org.clojure:java.jmx</include>
-                            <include>clout:clout</include>
-                            <include>org.clojure:tools.namespace</include>
-                            <include>cheshire:cheshire</include>
-                            <include>org.clojure:core.incubator</include>
-                            <include>metrics-clojure:*</include>
-                        </includes>
-                    </artifactSet>
-                    <relocations>
-                        <relocation>
-                            <pattern>cheshire</pattern>
-                            <shadedPattern>org.apache.storm.shade.cheshire</shadedPattern>
-                        </relocation>
-                        <relocation>
-                            <pattern>clojure.tools.logging</pattern>
-                            <shadedPattern>org.apache.storm.shade.clojure.tools.logging</shadedPattern>
-                        </relocation>
-                        <relocation>
-                            <pattern>clojure.core.incubator</pattern>
-                            <shadedPattern>org.apache.storm.shade.clojure.core.incubator</shadedPattern>
-                        </relocation>
-                        <relocation>
-                            <pattern>clojure.tools.namespace</pattern>
-                            <shadedPattern>org.apache.storm.shade.clojure.tools.namespace</shadedPattern>
-                        </relocation>
-                        <relocation>
-                            <pattern>clout</pattern>
-                            <shadedPattern>org.apache.storm.shade.clout</shadedPattern>
-                        </relocation>
-                        <relocation>
-                            <pattern>compojure</pattern>
-                            <shadedPattern>org.apache.storm.shade.compojure</shadedPattern>
-                        </relocation>
-                        <relocation>
-                            <pattern>ns_tracker</pattern>
-                            <shadedPattern>org.apache.storm.shade.ns_tracker</shadedPattern>
-                        </relocation>
-                        <relocation>
-                            <pattern>ns-tracker</pattern>
-                            <shadedPattern>org.apache.storm.shade.ns-tracker</shadedPattern>
-                        </relocation>
-                        <relocation>
-                            <pattern>hiccup</pattern>
-                            <shadedPattern>org.apache.storm.shade.hiccup</shadedPattern>
-                        </relocation>
-                        <relocation>
-                            <pattern>ring</pattern>
-                            <shadedPattern>org.apache.storm.shade.ring</shadedPattern>
-                        </relocation>
-                        <relocation>
-                            <pattern>clj_time</pattern>
-                            <shadedPattern>org.apache.storm.shade.clj_time</shadedPattern>
-                        </relocation>
-                        <relocation>
-                            <pattern>clj-time</pattern>
-                            <shadedPattern>org.apache.storm.shade.clj-time</shadedPattern>
-                        </relocation>
-                        <relocation>
-                          <pattern>clojure.math</pattern>
-                          <shadedPattern>org.apache.storm.shade.clojure.math</shadedPattern>
-                        </relocation>
-                        <relocation>
-                          <pattern>clojure.tools.cli</pattern>
-                          <shadedPattern>org.apache.storm.shade.clojure.tools.cli</shadedPattern>
-                        </relocation>
-                        <relocation>
-                          <pattern>cljs.tools.cli</pattern>
-                          <shadedPattern>org.apache.storm.shade.cljs.tools.cli</shadedPattern>
-                        </relocation>
-                        <relocation>
-                          <pattern>clojure.tools.macro</pattern>
-                          <shadedPattern>org.apache.storm.shade.clojure.tools.macro</shadedPattern>
-                        </relocation>
-                        <relocation>
-                            <pattern>metrics.core</pattern>
-                            <shadedPattern>org.apache.storm.shade.metrics.core</shadedPattern>
-                        </relocation>
-                        <relocation>
-                            <pattern>metrics.counters</pattern>
-                            <shadedPattern>org.apache.storm.shade.metrics.counters</shadedPattern>
-                        </relocation>
-                        <relocation>
-                            <pattern>metrics.gauges</pattern>
-                            <shadedPattern>org.apache.storm.shade.metrics.gauges</shadedPattern>
-                        </relocation>
-                        <relocation>
-                            <pattern>metrics.histograms</pattern>
-                            <shadedPattern>org.apache.storm.shade.metrics.histograms</shadedPattern>
-                        </relocation>
-                        <relocation>
-                            <pattern>metrics.meters</pattern>
-                            <shadedPattern>org.apache.storm.shade.metrics.meters</shadedPattern>
-                        </relocation>
-                        <relocation>
-                            <pattern>metrics.reporters</pattern>
-                            <shadedPattern>org.apache.storm.shade.metrics.reporters</shadedPattern>
-                        </relocation>
-                        <relocation>
-                            <pattern>metrics.timers</pattern>
-                            <shadedPattern>org.apache.storm.shade.metrics.timers</shadedPattern>
-                        </relocation>
-                        <relocation>
-                            <pattern>metrics.utils</pattern>
-                            <shadedPattern>org.apache.storm.shade.metrics.utils</shadedPattern>
-                        </relocation>
-                    </relocations>
-                    <transformers>
-                        <transformer implementation="org.apache.storm.maven.shade.clojure.ClojureTransformer" />
-                    </transformers>
-                    <filters>
-                        <!-- Several of these filters remove the .clj files from the shaded dependencies, even though only .clj files are in these jars.
-                             The reason for this is a bit complex, but intentional.  During the build process all of the dependency .clj files are
-                             compiled down into .class files, and included in storm-core.jar.  The regular shade transformer handles these in 
-                             the majority of cases correctly.  However, the Clojure-Transformer does not shade everything correctly all the
-                             time.  Instead of spending a lot of time to get the Clojure-Transformer to parse Clojure correctly we opted to remove
-                             the .clj files from the uber jar. -->
-                        <filter><artifact>metrics-clojure:*</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
-                        <filter><artifact>org.clojure:core.incubator</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
-                        <filter><artifact>cheshire:cheshire</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
-                        <filter><artifact>org.clojure:tools.logging</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
-                        <filter><artifact>org.clojure:tools.namespace</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
-                        <filter><artifact>org.clojure:math.numeric-tower</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
-                        <filter><artifact>org.clojure:tools.macro</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
-                        <filter><artifact>org.clojure:tools.cli</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
-                        <filter><artifact>ns-tracker:ns-tracker</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
-                        <filter><artifact>clout:clout</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
-                        <filter><artifact>hiccup:hiccup</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
-                        <filter><artifact>clj-time:clj-time</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
-                        <filter><artifact>ring:*</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
-                        <filter><artifact>compojure:compojure</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
-                        <filter>
-                            <artifact>*:*</artifact>
-                            <excludes>
-                                <exclude>META-INF/*.SF</exclude>
-                                <exclude>META-INF/*.sf</exclude>
-                                <exclude>META-INF/*.DSA</exclude>
-                                <exclude>META-INF/*.dsa</exclude>
-                                <exclude>META-INF/*.RSA</exclude>
-                                <exclude>META-INF/*.rsa</exclude>
-                                <exclude>META-INF/*.EC</exclude>
-                                <exclude>META-INF/*.ec</exclude>
-                                <exclude>META-INF/MSFTSIG.SF</exclude>
-                                <exclude>META-INF/MSFTSIG.RSA</exclude>
-                            </excludes>
-                        </filter>
-                    </filters>
-                </configuration>
-                <dependencies>
-                    <dependency>
-                        <groupId>org.apache.storm</groupId>
-                        <artifactId>maven-shade-clojure-transformer</artifactId>
-                        <version>${project.version}</version>
-                    </dependency>
-                </dependencies>
-            </plugin>
-            <plugin>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-maven-plugins</artifactId>
                <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/src/clj/org/apache/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/config.clj b/storm-core/src/clj/org/apache/storm/config.clj
deleted file mode 100644
index bf7cd12..0000000
--- a/storm-core/src/clj/org/apache/storm/config.clj
+++ /dev/null
@@ -1,37 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.config
-  (:import [org.apache.storm DaemonConfig Config])
-  (:import [org.apache.storm.validation ConfigValidation]))
-
-(defn- clojure-config-name [name]
-  (.replace (.toUpperCase name) "_" "-"))
-
-; define clojure constants for every configuration parameter
-(doseq [f (seq (.getDeclaredFields DaemonConfig))]
-  (when (ConfigValidation/isFieldAllowed f)
-    (let [name (.getName f)
-          new-name (clojure-config-name name)]
-      (eval
-        `(def ~(symbol new-name) (. DaemonConfig ~(symbol name)))))))
-
-(doseq [f (seq (.getDeclaredFields Config))]
-  (when (ConfigValidation/isFieldAllowed f)
-    (let [name (.getName f)
-          new-name (clojure-config-name name)]
-      (eval
-        `(def ~(symbol new-name) (. Config ~(symbol name)))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
index 05c887b..8720fca 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
@@ -18,7 +18,7 @@
   (:use [clojure.set :only [difference intersection]])
   (:use [clojure.string :only [blank? split]])
   (:use [hiccup core page-helpers form-helpers])
-  (:use [org.apache.storm config util log])
+  (:use [org.apache.storm config daemon-config util log])
   (:use [org.apache.storm.ui helpers])
   (:import [org.apache.storm StormTimer]
            [org.apache.storm.daemon.supervisor SupervisorUtils]

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/src/clj/org/apache/storm/daemon_config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon_config.clj b/storm-core/src/clj/org/apache/storm/daemon_config.clj
new file mode 100644
index 0000000..9d74c4f
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/daemon_config.clj
@@ -0,0 +1,30 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.daemon-config
+  (:import [org.apache.storm DaemonConfig])
+  (:import [org.apache.storm.validation ConfigValidation]))
+
+(defn- clojure-config-name [name]
+  (.replace (.toUpperCase name) "_" "-"))
+
+; define clojure constants for every configuration parameter
+(doseq [f (seq (.getDeclaredFields DaemonConfig))]
+  (when (ConfigValidation/isFieldAllowed f)
+    (let [name (.getName f)
+          new-name (clojure-config-name name)]
+      (eval
+        `(def ~(symbol new-name) (. DaemonConfig ~(symbol name)))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/src/clj/org/apache/storm/internal/clojure.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/internal/clojure.clj b/storm-core/src/clj/org/apache/storm/internal/clojure.clj
deleted file mode 100644
index f27ac04..0000000
--- a/storm-core/src/clj/org/apache/storm/internal/clojure.clj
+++ /dev/null
@@ -1,204 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.internal.clojure
-  (:use [org.apache.storm util])
-  (:import [org.apache.storm StormSubmitter])
-  (:import [org.apache.storm.generated StreamInfo])
-  (:import [org.apache.storm.tuple Tuple])
-  (:import [org.apache.storm.task OutputCollector IBolt TopologyContext])
-  (:import [org.apache.storm.spout SpoutOutputCollector ISpout])
-  (:import [org.apache.storm.utils Utils])
-  (:import [org.apache.storm.clojure ClojureBolt ClojureSpout])
-  (:import [java.util Collection List])
-  (:require [org.apache.storm.internal [thrift :as thrift]]))
-
-(defn direct-stream [fields]
-  (StreamInfo. fields true))
-
-(defn to-spec [avar]
-  (let [m (meta avar)]
-    [(str (:ns m)) (str (:name m))]))
-
-(defn clojure-bolt* [output-spec fn-var conf-fn-var args]
-  (ClojureBolt. (to-spec fn-var) (to-spec conf-fn-var) args (thrift/mk-output-spec output-spec)))
-
-(defmacro clojure-bolt [output-spec fn-sym conf-fn-sym args]
-  `(clojure-bolt* ~output-spec (var ~fn-sym) (var ~conf-fn-sym) ~args))
-
-(defn clojure-spout* [output-spec fn-var conf-var args]
-  (let [m (meta fn-var)]
-    (ClojureSpout. (to-spec fn-var) (to-spec conf-var) args (thrift/mk-output-spec output-spec))
-    ))
-
-(defmacro clojure-spout [output-spec fn-sym conf-sym args]
-  `(clojure-spout* ~output-spec (var ~fn-sym) (var ~conf-sym) ~args))
-
-(defn normalize-fns [body]
-  (for [[name args & impl] body
-        :let [args (-> "this"
-                       gensym
-                       (cons args)
-                       vec)]]
-    (concat [name args] impl)
-    ))
-
-(defmacro bolt [& body]
-  (let [[bolt-fns other-fns] (split-with #(not (symbol? %)) body)
-        fns (normalize-fns bolt-fns)]
-    `(reify IBolt
-       ~@fns
-       ~@other-fns)))
-
-(defmacro bolt-execute [& body]
-  `(bolt
-     (~'execute ~@body)))
-
-(defmacro spout [& body]
-  (let [[spout-fns other-fns] (split-with #(not (symbol? %)) body)
-        fns (normalize-fns spout-fns)]
-    `(reify ISpout
-       ~@fns
-       ~@other-fns)))
-
-(defmacro defbolt [name output-spec & [opts & impl :as all]]
-  (if-not (map? opts)
-    `(defbolt ~name ~output-spec {} ~@all)
-    (let [worker-name (symbol (str name "__"))
-          conf-fn-name (symbol (str name "__conf__"))
-          params (:params opts)
-          conf-code (:conf opts)
-          fn-body (if (:prepare opts)
-                    (cons 'fn impl)
-                    (let [[args & impl-body] impl
-                          coll-sym (nth args 1)
-                          args (vec (take 1 args))
-                          prepargs [(gensym "conf") (gensym "context") coll-sym]]
-                      `(fn ~prepargs (bolt (~'execute ~args ~@impl-body)))))
-          definer (if params
-                    `(defn ~name [& args#]
-                       (clojure-bolt ~output-spec ~worker-name ~conf-fn-name args#))
-                    `(def ~name
-                       (clojure-bolt ~output-spec ~worker-name ~conf-fn-name []))
-                    )
-          ]
-      `(do
-         (defn ~conf-fn-name ~(if params params [])
-           ~conf-code
-           )
-         (defn ~worker-name ~(if params params [])
-           ~fn-body
-           )
-         ~definer
-         ))))
-
-(defmacro defspout [name output-spec & [opts & impl :as all]]
-  (if-not (map? opts)
-    `(defspout ~name ~output-spec {} ~@all)
-    (let [worker-name (symbol (str name "__"))
-          conf-fn-name (symbol (str name "__conf__"))
-          params (:params opts)
-          conf-code (:conf opts)
-          prepare? (:prepare opts)
-          prepare? (if (nil? prepare?) true prepare?)
-          fn-body (if prepare?
-                    (cons 'fn impl)
-                    (let [[args & impl-body] impl
-                          coll-sym (first args)
-                          prepargs [(gensym "conf") (gensym "context") coll-sym]]
-                      `(fn ~prepargs (spout (~'nextTuple [] ~@impl-body)))))
-          definer (if params
-                    `(defn ~name [& args#]
-                       (clojure-spout ~output-spec ~worker-name ~conf-fn-name args#))
-                    `(def ~name
-                       (clojure-spout ~output-spec ~worker-name ~conf-fn-name []))
-                    )
-          ]
-      `(do
-         (defn ~conf-fn-name ~(if params params [])
-           ~conf-code
-           )
-         (defn ~worker-name ~(if params params [])
-           ~fn-body
-           )
-         ~definer
-         ))))
-
-(defprotocol TupleValues
-  (tuple-values [values collector stream]))
-
-(extend-protocol TupleValues
-  java.util.Map
-  (tuple-values [this collector ^String stream]
-    (let [^TopologyContext context (:context collector)
-          fields (..  context (getThisOutputFields stream) toList) ]
-      (vec (map (into
-                  (empty this) (for [[k v] this]
-                                   [(if (keyword? k) (name k) k) v]))
-                fields))))
-  java.util.List
-  (tuple-values [this collector stream]
-    this))
-
-(defn- collectify
-  [obj]
-  (if (or (sequential? obj) (instance? Collection obj))
-    obj
-    [obj]))
-
-(defnk emit-bolt! [collector values
-                   :stream Utils/DEFAULT_STREAM_ID :anchor []]
-  (let [^List anchor (collectify anchor)
-        values (tuple-values values collector stream) ]
-    (.emit ^OutputCollector (:output-collector collector) stream anchor values)
-    ))
-
-(defnk emit-direct-bolt! [collector task values
-                          :stream Utils/DEFAULT_STREAM_ID :anchor []]
-  (let [^List anchor (collectify anchor)
-        values (tuple-values values collector stream) ]
-    (.emitDirect ^OutputCollector (:output-collector collector) task stream anchor values)
-    ))
-
-(defn ack! [collector ^Tuple tuple]
-  (.ack ^OutputCollector (:output-collector collector) tuple))
-
-(defn fail! [collector ^Tuple tuple]
-  (.fail ^OutputCollector (:output-collector collector) tuple))
-
-(defn reset-timeout! [collector ^Tuple tuple]
-  (.resetTimeout ^OutputCollector (:output-collector collector) tuple))
-
-(defn report-error! [collector ^Tuple tuple]
-  (.reportError ^OutputCollector (:output-collector collector) tuple))
-
-(defnk emit-spout! [collector values
-                    :stream Utils/DEFAULT_STREAM_ID :id nil]
-  (let [values (tuple-values values collector stream)]
-    (.emit ^SpoutOutputCollector (:output-collector collector) stream values id)))
-
-(defnk emit-direct-spout! [collector task values
-                           :stream Utils/DEFAULT_STREAM_ID :id nil]
-  (let [values (tuple-values values collector stream)]
-    (.emitDirect ^SpoutOutputCollector (:output-collector collector) task stream values id)))
-
-(defn submit-remote-topology [name conf topology]
-  (StormSubmitter/submitTopology name conf topology))
-
-(defn local-cluster []
-  ;; do this to avoid a cyclic dependency of
-  ;; LocalCluster -> testing -> nimbus -> bootstrap -> clojure -> LocalCluster
-  (eval '(new org.apache.storm.LocalCluster)))

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/src/clj/org/apache/storm/log.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/log.clj b/storm-core/src/clj/org/apache/storm/log.clj
deleted file mode 100644
index 7a006ef..0000000
--- a/storm-core/src/clj/org/apache/storm/log.clj
+++ /dev/null
@@ -1,34 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.log
-  (:require [clojure.tools.logging :as log]))
-
-(defmacro log-message
-  [& args]
-  `(log/info (str ~@args)))
-
-(defmacro log-error
-  [e & args]
-  `(log/log :error ~e (str ~@args)))
-
-(defmacro log-debug
-  [& args]
-  `(log/debug (str ~@args)))
-
-(defmacro log-warn
-  [& args]
-  `(log/warn (str ~@args)))

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj
index 02ee449..2a07fdc 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -24,7 +24,7 @@
         ring.middleware.multipart-params.temp-file)
   (:use [ring.middleware.json :only [wrap-json-params]])
   (:use [hiccup core page-helpers])
-  (:use [org.apache.storm config util log])
+  (:use [org.apache.storm config daemon-config util log])
   (:use [org.apache.storm.ui helpers])
   (:import [org.apache.storm.utils Time]
            [org.apache.storm.generated NimbusSummary]

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/src/clj/org/apache/storm/util.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/util.clj b/storm-core/src/clj/org/apache/storm/util.clj
deleted file mode 100644
index 9ad1f10..0000000
--- a/storm-core/src/clj/org/apache/storm/util.clj
+++ /dev/null
@@ -1,134 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.util
-  (:import [java.util Map List HashMap])
-  (:import [org.apache.storm.generated ErrorInfo])
-  (:import [org.apache.storm.utils Utils])
-  (:import [java.util List Set])
-  (:use [clojure walk])
-  (:use [org.apache.storm log]))
-
-;; name-with-attributes by Konrad Hinsen:
-(defn name-with-attributes
-  "To be used in macro definitions.
-  Handles optional docstrings and attribute maps for a name to be defined
-  in a list of macro arguments. If the first macro argument is a string,
-  it is added as a docstring to name and removed from the macro argument
-  list. If afterwards the first macro argument is a map, its entries are
-  added to the name's metadata map and the map is removed from the
-  macro argument list. The return value is a vector containing the name
-  with its extended metadata map and the list of unprocessed macro
-  arguments."
-  [name macro-args]
-  (let [[docstring macro-args] (if (string? (first macro-args))
-                                 [(first macro-args) (next macro-args)]
-                                 [nil macro-args])
-        [attr macro-args] (if (map? (first macro-args))
-                            [(first macro-args) (next macro-args)]
-                            [{} macro-args])
-        attr (if docstring
-               (assoc attr :doc docstring)
-               attr)
-        attr (if (meta name)
-               (conj (meta name) attr)
-               attr)]
-    [(with-meta name attr) macro-args]))
-
-(defmacro defnk
-  "Define a function accepting keyword arguments. Symbols up to the first
-  keyword in the parameter list are taken as positional arguments.  Then
-  an alternating sequence of keywords and defaults values is expected. The
-  values of the keyword arguments are available in the function body by
-  virtue of the symbol corresponding to the keyword (cf. :keys destructuring).
-  defnk accepts an optional docstring as well as an optional metadata map."
-  [fn-name & fn-tail]
-  (let [[fn-name [args & body]] (name-with-attributes fn-name fn-tail)
-        [pos kw-vals] (split-with symbol? args)
-        syms (map #(-> % name symbol) (take-nth 2 kw-vals))
-        values (take-nth 2 (rest kw-vals))
-        sym-vals (apply hash-map (interleave syms values))
-        de-map {:keys (vec syms) :or sym-vals}]
-    `(defn ~fn-name
-       [~@pos & options#]
-       (let [~de-map (apply hash-map options#)]
-         ~@body))))
-
-(defmacro thrown-cause?
-  [klass & body]
-  `(try
-     ~@body
-     false
-     (catch Throwable t#
-       (let [tc# (Utils/exceptionCauseIsInstanceOf ~klass t#)]
-         (if (not tc#) (log-error t# "Exception did not match " ~klass))
-         tc#))))
-
-(defn clojurify-structure
-  [s]
-  (if s
-    (prewalk (fn [x]
-             (cond (instance? Map x) (into {} x)
-                   (instance? List x) (vec x)
-                   (instance? Set x) (into #{} x)
-                   ;; (Boolean. false) does not evaluate to false in an if.
-                   ;; This fixes that.
-                   (instance? Boolean x) (boolean x)
-                   true x))
-           s)))
-; move this func form convert.clj due to cyclic load dependency
-(defn clojurify-error [^ErrorInfo error]
-  (if error
-    {
-      :error (.get_error error)
-      :time-secs (.get_error_time_secs error)
-      :host (.get_host error)
-      :port (.get_port error)
-      }
-    ))
-
-;TODO: We're keeping this function around until all the code using it is properly tranlated to java
-;TODO: by properly having the for loop IN THE JAVA FUNCTION that originally used this function.
-(defn map-val
-  [afn amap]
-  (into {}
-        (for [[k v] amap]
-          [k (afn v)])))
-
-;TODO: We're keeping this function around until all the code using it is properly tranlated to java
-;TODO: by properly having the for loop IN THE JAVA FUNCTION that originally used this function.
-(defn filter-key
-  [afn amap]
-  (into {} (filter (fn [[k v]] (afn k)) amap)))
-
-;TODO: Once all the other clojure functions (100+ locations) are translated to java, this function becomes moot.
-(def not-nil? (complement nil?))
-
-(defmacro dofor [& body]
-  `(doall (for ~@body)))
-
-(defmacro -<>
-  ([x] x)
-  ([x form] (if (seq? form)
-              (with-meta
-                (let [[begin [_ & end]] (split-with #(not= % '<>) form)]
-                  (concat begin [x] end))
-                (meta form))
-              (list form x)))
-  ([x form & more] `(-<> (-<> ~x ~form) ~@more)))
-
-(defn hashmap-to-persistent [^HashMap m]
-  (zipmap (.keySet m) (.values m)))

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
index 7342070..c5346d7 100644
--- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
@@ -22,7 +22,7 @@
             TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout])
   (:import [org.apache.storm.utils Time])
   (:import [org.apache.storm.tuple Fields])
-  (:use [org.apache.storm.internal clojure])
+  (:use [org.apache.storm clojure])
   (:use [org.apache.storm config util])
   (:import [org.apache.storm Thrift])
   (:import [org.apache.storm.utils Utils]))

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
index 3c6c58a..87e1fc0 100644
--- a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
@@ -15,8 +15,8 @@
 ;; limitations under the License.
 (ns integration.org.apache.storm.testing4j-test
   (:use [clojure.test])
-  (:use [org.apache.storm config util])
-  (:use [org.apache.storm.internal clojure])
+  (:use [org.apache.storm daemon-config config util])
+  (:use [org.apache.storm clojure])
   (:require [integration.org.apache.storm.integration-test :as it])
   (:require [org.apache.storm.internal.thrift :as thrift])
   (:import [org.apache.storm Testing Config]

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/clj/org/apache/storm/drpc_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/drpc_test.clj b/storm-core/test/clj/org/apache/storm/drpc_test.clj
index b0cd62b..67e18c1 100644
--- a/storm-core/test/clj/org/apache/storm/drpc_test.clj
+++ b/storm-core/test/clj/org/apache/storm/drpc_test.clj
@@ -30,7 +30,7 @@
   (:import [org.apache.storm Thrift])
   (:import [org.mockito ArgumentCaptor Mockito Matchers])
   (:use [org.apache.storm config])
-  (:use [org.apache.storm.internal clojure])
+  (:use [org.apache.storm clojure])
   (:use [conjure core]))
 
 (defbolt exclamation-bolt ["result" "return-info"] [tuple collector]

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/clj/org/apache/storm/grouping_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/grouping_test.clj b/storm-core/test/clj/org/apache/storm/grouping_test.clj
index f138e75..9bfb8a7 100644
--- a/storm-core/test/clj/org/apache/storm/grouping_test.clj
+++ b/storm-core/test/clj/org/apache/storm/grouping_test.clj
@@ -19,7 +19,7 @@
            [org.apache.storm.generated JavaObject JavaObjectArg Grouping NullStruct])
   (:import [org.apache.storm.grouping LoadMapping])
   (:use [org.apache.storm log config])
-  (:use [org.apache.storm.internal clojure])
+  (:use [org.apache.storm clojure])
   (:import [org.apache.storm LocalCluster$Builder Testing Thrift])
   (:import [org.apache.storm.utils Utils]
            (org.apache.storm.daemon GrouperFactory)))

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/clj/org/apache/storm/logviewer_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/logviewer_test.clj b/storm-core/test/clj/org/apache/storm/logviewer_test.clj
index 4410af0..ae9b651 100644
--- a/storm-core/test/clj/org/apache/storm/logviewer_test.clj
+++ b/storm-core/test/clj/org/apache/storm/logviewer_test.clj
@@ -14,7 +14,7 @@
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
 (ns org.apache.storm.logviewer-test
-  (:use [org.apache.storm config util])
+  (:use [org.apache.storm daemon-config config util])
   (:require [org.apache.storm.daemon [logviewer :as logviewer]])
   (:require [conjure.core])
   (:use [clojure test])

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/clj/org/apache/storm/metrics_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/metrics_test.clj b/storm-core/test/clj/org/apache/storm/metrics_test.clj
index 6903cc1..4221d0b 100644
--- a/storm-core/test/clj/org/apache/storm/metrics_test.clj
+++ b/storm-core/test/clj/org/apache/storm/metrics_test.clj
@@ -26,7 +26,7 @@
   (:import [org.apache.storm Testing Testing$Condition LocalCluster$Builder])
   
   (:use [org.apache.storm config])
-  (:use [org.apache.storm.internal clojure])
+  (:use [org.apache.storm clojure])
   (:use [org.apache.storm.util])
   (:import [org.apache.storm Thrift])
   (:import [org.apache.storm.utils Utils]

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 39c675e..e91a9b0 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -47,7 +47,7 @@
   (:import [org.json.simple JSONValue])
   (:import [org.apache.storm.daemon StormCommon])
   (:import [org.apache.storm.cluster IStormClusterState StormClusterStateImpl ClusterStateContext ClusterUtils])
-  (:use [org.apache.storm util config log])
+  (:use [org.apache.storm util daemon-config config log])
   (:require [conjure.core])
 
   (:use [conjure core]))

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj b/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
index 1c3d94a..339d5b3 100644
--- a/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
+++ b/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
@@ -15,7 +15,7 @@
 ;; limitations under the License.
 (ns org.apache.storm.scheduler.multitenant-scheduler-test
   (:use [clojure test])
-  (:use [org.apache.storm config log])
+  (:use [org.apache.storm daemon-config config log])
   (:import [org.apache.storm.generated StormTopology])
   (:import [org.apache.storm.daemon.nimbus Nimbus$StandaloneINimbus])
   (:import [org.apache.storm.scheduler Cluster SupervisorDetails WorkerSlot ExecutorDetails

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
index fc95097..9133fd9 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
@@ -38,7 +38,7 @@
   (:import [org.apache.storm.security.auth AuthUtils ThriftServer ThriftClient ShellBasedGroupsMapping
             ReqContext SimpleTransportPlugin KerberosPrincipalToLocal ThriftConnectionType])
   (:import [org.apache.storm.daemon StormCommon])
-  (:use [org.apache.storm util config])
+  (:use [org.apache.storm util daemon-config config])
   (:import [org.apache.storm.generated Nimbus Nimbus$Client Nimbus$Iface StormTopology SubmitOptions
             KillOptions RebalanceOptions ClusterSummary TopologyInfo Nimbus$Processor]
            (org.json.simple JSONValue))

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
index ecdc337..8476e7e 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
@@ -29,7 +29,7 @@
   (:import [org.apache.storm.utils ConfigUtils Utils])
   (:import [org.apache.storm.cluster IStormClusterState])
   (:import [org.mockito Mockito Matchers])
-  (:use [org.apache.storm util config log])
+  (:use [org.apache.storm util config daemon-config log])
   (:require [conjure.core])
   (:use [conjure core]))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/clj/org/apache/storm/transactional_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/transactional_test.clj b/storm-core/test/clj/org/apache/storm/transactional_test.clj
index 358a4ba..e84efbc 100644
--- a/storm-core/test/clj/org/apache/storm/transactional_test.clj
+++ b/storm-core/test/clj/org/apache/storm/transactional_test.clj
@@ -37,7 +37,7 @@
   (:import [org.mockito.exceptions.base MockitoAssertionError])
   (:import [java.util HashMap Collections ArrayList])
   (:use [org.apache.storm util config log])
-  (:use [org.apache.storm.internal clojure]))
+  (:use [org.apache.storm clojure]))
 
 ;; Testing TODO:
 ;; * Test that it repeats the meta for a partitioned state (test partitioned emitter on its own)

http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/clj/org/apache/storm/tuple_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/tuple_test.clj b/storm-core/test/clj/org/apache/storm/tuple_test.clj
deleted file mode 100644
index 5d5dea7..0000000
--- a/storm-core/test/clj/org/apache/storm/tuple_test.clj
+++ /dev/null
@@ -1,52 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.tuple-test
-  (:use [clojure test])
-  (:import [org.apache.storm Testing])
-  (:import [org.apache.storm.testing MkTupleParam])
-  (:import [org.apache.storm.tuple Tuple]))
-
-(deftest test-lookup
-  (let [ tuple (Testing/testTuple [12 "hello"] (doto (MkTupleParam. ) (.setFieldsList ["foo" "bar"]))) ]
-    (is (= 12 (tuple "foo")))
-    (is (= 12 (tuple :foo)))
-    (is (= 12 (:foo tuple)))
-
-    (is (= "hello" (:bar tuple)))
-    
-    (is (= :notfound (tuple "404" :notfound)))))
-
-(deftest test-indexed
-  (let [ tuple (Testing/testTuple [12 "hello"] (doto (MkTupleParam. ) (.setFieldsList ["foo" "bar"]))) ]
-    (is (= 12 (nth tuple 0)))
-    (is (= "hello" (nth tuple 1)))))
-
-(deftest test-seq
-  (let [ tuple (Testing/testTuple [12 "hello"] (doto (MkTupleParam. ) (.setFieldsList ["foo" "bar"]))) ]
-    (is (= [["foo" 12] ["bar" "hello"]] (seq tuple)))))
-
-(deftest test-map
-    (let [tuple (Testing/testTuple [12 "hello"] (doto (MkTupleParam. ) (.setFieldsList ["foo" "bar"]))) ]
-      (is (= {"foo" 42 "bar" "hello"} (.getMap (assoc tuple "foo" 42))))
-      (is (= {"foo" 42 "bar" "hello"} (.getMap (assoc tuple :foo 42))))
-
-      (is (= {"bar" "hello"} (.getMap (dissoc tuple "foo"))))
-      (is (= {"bar" "hello"} (.getMap (dissoc tuple :foo))))
-
-      (is (= {"foo" 42 "bar" "world"} (.getMap (assoc 
-                                        (assoc tuple "foo" 42)
-                                        :bar "world"))))))
-