You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/28 01:44:28 UTC
[2/6] storm git commit: STORM-2468: Remove clojure from storm-client
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureTuple.java
----------------------------------------------------------------------
diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureTuple.java b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureTuple.java
new file mode 100644
index 0000000..8de15eb
--- /dev/null
+++ b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureTuple.java
@@ -0,0 +1,439 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.clojure;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+
+import clojure.lang.AFn;
+import clojure.lang.ASeq;
+import clojure.lang.ArityException;
+import clojure.lang.Counted;
+import clojure.lang.IFn;
+import clojure.lang.ILookup;
+import clojure.lang.IMapEntry;
+import clojure.lang.IMeta;
+import clojure.lang.IPersistentCollection;
+import clojure.lang.IPersistentMap;
+import clojure.lang.ISeq;
+import clojure.lang.Indexed;
+import clojure.lang.Keyword;
+import clojure.lang.MapEntry;
+import clojure.lang.Obj;
+import clojure.lang.PersistentArrayMap;
+import clojure.lang.Seqable;
+import clojure.lang.Symbol;
+
+public class ClojureTuple extends TupleImpl implements Seqable, Indexed, IMeta, ILookup, IPersistentMap, Map, IFn {
+ private IPersistentMap _meta;
+ private IPersistentMap _map;
+
+ public ClojureTuple(Tuple t) {
+ super(t);
+ }
+
+ private Keyword makeKeyword(String name) {
+ return Keyword.intern(Symbol.create(name));
+ }
+
+ private PersistentArrayMap toMap() {
+ Object array[] = new Object[size()*2];
+ List<String> fields = getFields().toList();
+ for(int i=0; i < size(); i++) {
+ array[i*2] = fields.get(i);
+ array[(i*2)+1] = getValue(i);
+ }
+ return new PersistentArrayMap(array);
+ }
+
+ public IPersistentMap getMap() {
+ if (_map == null) {
+ _map = toMap();
+ }
+ return _map;
+ }
+
+ /* ILookup */
+ @Override
+ public Object valAt(Object o) {
+ try {
+ if(o instanceof Keyword) {
+ return getValueByField(((Keyword) o).getName());
+ } else if(o instanceof String) {
+ return getValueByField((String) o);
+ }
+ } catch(IllegalArgumentException ignored) {
+ }
+ return null;
+ }
+
+ @Override
+ public Object valAt(Object o, Object def) {
+ Object ret = valAt(o);
+ if (ret==null) ret = def;
+ return ret;
+ }
+
+ /* Seqable */
+ @Override
+ public ISeq seq() {
+ if(size() > 0) {
+ return new Seq(getFields().toList(), getValues(), 0);
+ }
+ return null;
+ }
+
+ static class Seq extends ASeq implements Counted {
+ private static final long serialVersionUID = 1L;
+ final List<String> fields;
+ final List<Object> values;
+ final int i;
+
+ Seq(List<String> fields, List<Object> values, int i) {
+ this.fields = fields;
+ this.values = values;
+ assert i >= 0;
+ this.i = i;
+ }
+
+ public Seq(IPersistentMap meta, List<String> fields, List<Object> values, int i) {
+ super(meta);
+ this.fields= fields;
+ this.values = values;
+ assert i >= 0;
+ this.i = i;
+ }
+
+ @Override
+ public Object first() {
+ return new MapEntry(fields.get(i), values.get(i));
+ }
+
+ @Override
+ public ISeq next() {
+ if(i+1 < fields.size()) {
+ return new Seq(fields, values, i+1);
+ }
+ return null;
+ }
+
+ @Override
+ public int count() {
+ assert fields.size() -i >= 0 : "index out of bounds";
+ // i being the position in the fields of this seq, the remainder of the seq is the size
+ return fields.size() -i;
+ }
+
+ @Override
+ public Obj withMeta(IPersistentMap meta) {
+ return new Seq(meta, fields, values, i);
+ }
+ }
+
+ /* Indexed */
+ public Object nth(int i) {
+ if (i < size()) {
+ return getValue(i);
+ } else {
+ return null;
+ }
+ }
+
+ public Object nth(int i, Object notfound) {
+ Object ret = nth(i);
+ if (ret==null) ret = notfound;
+ return ret;
+ }
+
+ /* Counted */
+ public int count() {
+ return size();
+ }
+
+ /* IMeta */
+ public IPersistentMap meta() {
+ if(_meta==null) {
+ _meta = new PersistentArrayMap( new Object[] {
+ makeKeyword("stream"), getSourceStreamId(),
+ makeKeyword("component"), getSourceComponent(),
+ makeKeyword("task"), getSourceTask()});
+ }
+ return _meta;
+ }
+
+ /* IFn */
+ @Override
+ public Object invoke(Object o) {
+ return valAt(o);
+ }
+
+ @Override
+ public Object invoke(Object o, Object notfound) {
+ return valAt(o, notfound);
+ }
+
+ @Override
+ public Object invoke() {
+ throw new ArityException(0, "1 or 2 args only");
+ }
+
+ @Override
+ public Object invoke(Object arg1, Object arg2, Object arg3) {
+ throw new ArityException(3, "1 or 2 args only");
+ }
+
+ @Override
+ public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4) {
+ throw new ArityException(4, "1 or 2 args only");
+ }
+
+ @Override
+ public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5) {
+ throw new ArityException(5, "1 or 2 args only");
+ }
+
+ @Override
+ public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6) {
+ throw new ArityException(6, "1 or 2 args only");
+ }
+
+ @Override
+ public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7) {
+ throw new ArityException(7, "1 or 2 args only");
+ }
+
+ @Override
+ public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+ Object arg8) {
+ throw new ArityException(8, "1 or 2 args only");
+ }
+
+ @Override
+ public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+ Object arg8, Object arg9) {
+ throw new ArityException(9, "1 or 2 args only");
+ }
+
+ @Override
+ public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+ Object arg8, Object arg9, Object arg10) {
+ throw new ArityException(10, "1 or 2 args only");
+ }
+
+ @Override
+ public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+ Object arg8, Object arg9, Object arg10, Object arg11) {
+ throw new ArityException(11, "1 or 2 args only");
+ }
+
+ @Override
+ public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+ Object arg8, Object arg9, Object arg10, Object arg11, Object arg12) {
+ throw new ArityException(12, "1 or 2 args only");
+ }
+
+ @Override
+ public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+ Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13) {
+ throw new ArityException(13, "1 or 2 args only");
+ }
+
+ @Override
+ public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+ Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14) {
+ throw new ArityException(14, "1 or 2 args only");
+ }
+
+ @Override
+ public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+ Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14,
+ Object arg15) {
+ throw new ArityException(15, "1 or 2 args only");
+ }
+
+ @Override
+ public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+ Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14,
+ Object arg15, Object arg16) {
+ throw new ArityException(16, "1 or 2 args only");
+ }
+
+ @Override
+ public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+ Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14,
+ Object arg15, Object arg16, Object arg17) {
+ throw new ArityException(17, "1 or 2 args only");
+ }
+
+ @Override
+ public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+ Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14,
+ Object arg15, Object arg16, Object arg17, Object arg18) {
+ throw new ArityException(18, "1 or 2 args only");
+ }
+
+ @Override
+ public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+ Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14,
+ Object arg15, Object arg16, Object arg17, Object arg18, Object arg19) {
+ throw new ArityException(19, "1 or 2 args only");
+ }
+
+ @Override
+ public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+ Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14,
+ Object arg15, Object arg16, Object arg17, Object arg18, Object arg19, Object arg20) {
+ throw new ArityException(20, "1 or 2 args only");
+ }
+
+ @Override
+ public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
+ Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14,
+ Object arg15, Object arg16, Object arg17, Object arg18, Object arg19, Object arg20, Object... args) {
+ throw new ArityException(21, "1 or 2 args only");
+ }
+
+ @Override
+ public Object applyTo(ISeq arglist) {
+ return AFn.applyToHelper(this, arglist);
+ }
+
+ @Override
+ public Object call() throws Exception {
+ return invoke();
+ }
+
+ @Override
+ public void run() {
+ invoke();
+ }
+
+ /* IPersistentMap */
+ /* Naive implementation, but it might be good enough */
+ @Override
+ public IPersistentMap assoc(Object k, Object v) {
+ if(k instanceof Keyword) return assoc(((Keyword) k).getName(), v);
+
+ return new IndifferentAccessMap(getMap().assoc(k, v));
+ }
+
+ @Override
+ public IPersistentMap assocEx(Object k, Object v) {
+ if(k instanceof Keyword) return assocEx(((Keyword) k).getName(), v);
+
+ return new IndifferentAccessMap(getMap().assocEx(k, v));
+ }
+
+ @Override
+ public IPersistentMap without(Object k) {
+ if(k instanceof Keyword) return without(((Keyword) k).getName());
+
+ return new IndifferentAccessMap(getMap().without(k));
+ }
+
+ @Override
+ public boolean containsKey(Object k) {
+ if(k instanceof Keyword) return containsKey(((Keyword) k).getName());
+ return getMap().containsKey(k);
+ }
+
+ @Override
+ public IMapEntry entryAt(Object k) {
+ if(k instanceof Keyword) return entryAt(((Keyword) k).getName());
+
+ return getMap().entryAt(k);
+ }
+
+ @Override
+ public IPersistentCollection cons(Object o) {
+ return getMap().cons(o);
+ }
+
+ @Override
+ public IPersistentCollection empty() {
+ return new IndifferentAccessMap(PersistentArrayMap.EMPTY);
+ }
+
+ @Override
+ public boolean equiv(Object o) {
+ return getMap().equiv(o);
+ }
+
+ @Override
+ public Iterator iterator() {
+ return getMap().iterator();
+ }
+
+ /* Map */
+ @Override
+ public boolean containsValue(Object v) {
+ return ((Map) getMap()).containsValue(v);
+ }
+
+ @Override
+ public Set entrySet() {
+ return ((Map) getMap()).entrySet();
+ }
+
+ @Override
+ public Object get(Object k) {
+ return valAt(k);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return ((Map) getMap()).isEmpty();
+ }
+
+ @Override
+ public Set keySet() {
+ return ((Map) getMap()).keySet();
+ }
+
+ @Override
+ public Collection values() {
+ return ((Map) getMap()).values();
+ }
+
+ /* Not implemented */
+ @Override
+ public void clear() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object put(Object k, Object v) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void putAll(Map m) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object remove(Object k) {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureUtil.java
----------------------------------------------------------------------
diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureUtil.java b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureUtil.java
new file mode 100644
index 0000000..0bfe9fb
--- /dev/null
+++ b/storm-clojure/src/main/java/org/apache/storm/clojure/ClojureUtil.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.clojure;
+
+import clojure.lang.RT;
+
+public class ClojureUtil {
+ public static synchronized clojure.lang.IFn loadClojureFn(String namespace, String name) {
+ try {
+ clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")"));
+ } catch (Exception e) {
+ //if playing from the repl and defining functions, file won't exist
+ }
+ return (clojure.lang.IFn) RT.var(namespace, name).deref();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/src/main/java/org/apache/storm/clojure/IndifferentAccessMap.java
----------------------------------------------------------------------
diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/IndifferentAccessMap.java b/storm-clojure/src/main/java/org/apache/storm/clojure/IndifferentAccessMap.java
new file mode 100644
index 0000000..b30788a
--- /dev/null
+++ b/storm-clojure/src/main/java/org/apache/storm/clojure/IndifferentAccessMap.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.clojure;
+
+import clojure.lang.ILookup;
+import clojure.lang.ISeq;
+import clojure.lang.AFn;
+import clojure.lang.IPersistentMap;
+import clojure.lang.PersistentArrayMap;
+import clojure.lang.IMapEntry;
+import clojure.lang.IPersistentCollection;
+import clojure.lang.Keyword;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Collection;
+import java.util.Set;
+
+public class IndifferentAccessMap extends AFn implements ILookup, IPersistentMap, Map {
+
+ protected IPersistentMap _map;
+
+ protected IndifferentAccessMap() {
+ }
+
+ public IndifferentAccessMap(IPersistentMap map) {
+ setMap(map);
+ }
+
+ public IPersistentMap getMap() {
+ return _map;
+ }
+
+ public IPersistentMap setMap(IPersistentMap map) {
+ _map = map;
+ return _map;
+ }
+
+ public int size() {
+ return ((Map) getMap()).size();
+ }
+
+ public int count() {
+ return size();
+ }
+
+ public ISeq seq() {
+ return getMap().seq();
+ }
+
+ @Override
+ public Object valAt(Object o) {
+ if(o instanceof Keyword) {
+ return valAt(((Keyword) o).getName());
+ }
+ return getMap().valAt(o);
+ }
+
+ @Override
+ public Object valAt(Object o, Object def) {
+ Object ret = valAt(o);
+ if(ret==null) ret = def;
+ return ret;
+ }
+
+ /* IFn */
+ @Override
+ public Object invoke(Object o) {
+ return valAt(o);
+ }
+
+ @Override
+ public Object invoke(Object o, Object notfound) {
+ return valAt(o, notfound);
+ }
+
+ /* IPersistentMap */
+ /* Naive implementation, but it might be good enough */
+ public IPersistentMap assoc(Object k, Object v) {
+ if(k instanceof Keyword) return assoc(((Keyword) k).getName(), v);
+
+ return new IndifferentAccessMap(getMap().assoc(k, v));
+ }
+
+ public IPersistentMap assocEx(Object k, Object v) {
+ if(k instanceof Keyword) return assocEx(((Keyword) k).getName(), v);
+
+ return new IndifferentAccessMap(getMap().assocEx(k, v));
+ }
+
+ public IPersistentMap without(Object k) {
+ if(k instanceof Keyword) return without(((Keyword) k).getName());
+
+ return new IndifferentAccessMap(getMap().without(k));
+ }
+
+ public boolean containsKey(Object k) {
+ if(k instanceof Keyword) return containsKey(((Keyword) k).getName());
+ return getMap().containsKey(k);
+ }
+
+ public IMapEntry entryAt(Object k) {
+ if(k instanceof Keyword) return entryAt(((Keyword) k).getName());
+
+ return getMap().entryAt(k);
+ }
+
+ public IPersistentCollection cons(Object o) {
+ return getMap().cons(o);
+ }
+
+ public IPersistentCollection empty() {
+ return new IndifferentAccessMap(PersistentArrayMap.EMPTY);
+ }
+
+ public boolean equiv(Object o) {
+ return getMap().equiv(o);
+ }
+
+ public Iterator iterator() {
+ return getMap().iterator();
+ }
+
+ /* Map */
+ public boolean containsValue(Object v) {
+ return ((Map) getMap()).containsValue(v);
+ }
+
+ public Set entrySet() {
+ return ((Map) getMap()).entrySet();
+ }
+
+ public Object get(Object k) {
+ return valAt(k);
+ }
+
+ public boolean isEmpty() {
+ return ((Map) getMap()).isEmpty();
+ }
+
+ public Set keySet() {
+ return ((Map) getMap()).keySet();
+ }
+
+ public Collection values() {
+ return ((Map) getMap()).values();
+ }
+
+ /* Not implemented */
+ public void clear() {
+ throw new UnsupportedOperationException();
+ }
+ public Object put(Object k, Object v) {
+ throw new UnsupportedOperationException();
+ }
+ public void putAll(Map m) {
+ throw new UnsupportedOperationException();
+ }
+ public Object remove(Object k) {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/src/main/java/org/apache/storm/clojure/RichShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/RichShellBolt.java b/storm-clojure/src/main/java/org/apache/storm/clojure/RichShellBolt.java
new file mode 100644
index 0000000..6de5637
--- /dev/null
+++ b/storm-clojure/src/main/java/org/apache/storm/clojure/RichShellBolt.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.clojure;
+
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.task.ShellBolt;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import java.util.Map;
+
+public class RichShellBolt extends ShellBolt implements IRichBolt {
+ private Map<String, StreamInfo> _outputs;
+
+ public RichShellBolt(String[] command, Map<String, StreamInfo> outputs) {
+ super(command);
+ _outputs = outputs;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ for(String stream: _outputs.keySet()) {
+ StreamInfo def = _outputs.get(stream);
+ if(def.is_direct()) {
+ declarer.declareStream(stream, true, new Fields(def.get_output_fields()));
+ } else {
+ declarer.declareStream(stream, new Fields(def.get_output_fields()));
+ }
+ }
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/src/main/java/org/apache/storm/clojure/RichShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-clojure/src/main/java/org/apache/storm/clojure/RichShellSpout.java b/storm-clojure/src/main/java/org/apache/storm/clojure/RichShellSpout.java
new file mode 100644
index 0000000..9fb7e73
--- /dev/null
+++ b/storm-clojure/src/main/java/org/apache/storm/clojure/RichShellSpout.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.clojure;
+
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.spout.ShellSpout;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import java.util.Map;
+
+public class RichShellSpout extends ShellSpout implements IRichSpout {
+ private Map<String, StreamInfo> _outputs;
+
+ public RichShellSpout(String[] command, Map<String, StreamInfo> outputs) {
+ super(command);
+ _outputs = outputs;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ for(String stream: _outputs.keySet()) {
+ StreamInfo def = _outputs.get(stream);
+ if(def.is_direct()) {
+ declarer.declareStream(stream, true, new Fields(def.get_output_fields()));
+ } else {
+ declarer.declareStream(stream, new Fields(def.get_output_fields()));
+ }
+ }
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/src/main/resources/META-INF/services/org.apache.storm.serialization.SerializationRegister
----------------------------------------------------------------------
diff --git a/storm-clojure/src/main/resources/META-INF/services/org.apache.storm.serialization.SerializationRegister b/storm-clojure/src/main/resources/META-INF/services/org.apache.storm.serialization.SerializationRegister
new file mode 100644
index 0000000..cb66fd0
--- /dev/null
+++ b/storm-clojure/src/main/resources/META-INF/services/org.apache.storm.serialization.SerializationRegister
@@ -0,0 +1 @@
+org.apache.storm.clojure.ClojureSerializationRegister
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/test/clj/org/apache/storm/tuple_test.clj
----------------------------------------------------------------------
diff --git a/storm-clojure/test/clj/org/apache/storm/tuple_test.clj b/storm-clojure/test/clj/org/apache/storm/tuple_test.clj
new file mode 100644
index 0000000..a67022a
--- /dev/null
+++ b/storm-clojure/test/clj/org/apache/storm/tuple_test.clj
@@ -0,0 +1,53 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.tuple-test
+ (:use [clojure test])
+ (:import [org.apache.storm Testing])
+ (:import [org.apache.storm.testing MkTupleParam])
+ (:import [org.apache.storm.tuple Tuple])
+ (:import [org.apache.storm.clojure ClojureTuple]))
+
+(deftest test-lookup
+ (let [ tuple (ClojureTuple. (Testing/testTuple [12 "hello"] (doto (MkTupleParam. ) (.setFieldsList ["foo" "bar"])))) ]
+ (is (= 12 (tuple "foo")))
+ (is (= 12 (tuple :foo)))
+ (is (= 12 (:foo tuple)))
+
+ (is (= "hello" (:bar tuple)))
+
+ (is (= :notfound (tuple "404" :notfound)))))
+
+(deftest test-indexed
+ (let [ tuple (ClojureTuple. (Testing/testTuple [12 "hello"] (doto (MkTupleParam. ) (.setFieldsList ["foo" "bar"])))) ]
+ (is (= 12 (nth tuple 0)))
+ (is (= "hello" (nth tuple 1)))))
+
+(deftest test-seq
+ (let [ tuple (ClojureTuple. (Testing/testTuple [12 "hello"] (doto (MkTupleParam. ) (.setFieldsList ["foo" "bar"])))) ]
+ (is (= [["foo" 12] ["bar" "hello"]] (seq tuple)))))
+
+(deftest test-map
+ (let [tuple (ClojureTuple. (Testing/testTuple [12 "hello"] (doto (MkTupleParam. ) (.setFieldsList ["foo" "bar"])))) ]
+ (is (= {"foo" 42 "bar" "hello"} (.getMap (assoc tuple "foo" 42))))
+ (is (= {"foo" 42 "bar" "hello"} (.getMap (assoc tuple :foo 42))))
+
+ (is (= {"bar" "hello"} (.getMap (dissoc tuple "foo"))))
+ (is (= {"bar" "hello"} (.getMap (dissoc tuple :foo))))
+
+ (is (= {"foo" 42 "bar" "world"} (.getMap (assoc
+ (assoc tuple "foo" 42)
+ :bar "world"))))))
+
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/test/resources/log4j2-test.xml
----------------------------------------------------------------------
diff --git a/storm-clojure/test/resources/log4j2-test.xml b/storm-clojure/test/resources/log4j2-test.xml
new file mode 100644
index 0000000..e8ae19e
--- /dev/null
+++ b/storm-clojure/test/resources/log4j2-test.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<configuration monitorInterval="60">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT" follow="true">
+ <PatternLayout pattern="%-4r [%t] %-5p %c{1.} - %msg%n"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Logger name="org.apache.zookeeper" level="WARN"/>
+ <Root level="${env:LOG_LEVEL:-INFO}">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</configuration>
+
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-clojure/test/resources/test_runner.clj
----------------------------------------------------------------------
diff --git a/storm-clojure/test/resources/test_runner.clj b/storm-clojure/test/resources/test_runner.clj
new file mode 100644
index 0000000..c10fec3
--- /dev/null
+++ b/storm-clojure/test/resources/test_runner.clj
@@ -0,0 +1,114 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.testrunner)
+
+(import `java.util.Properties)
+(import `java.io.ByteArrayOutputStream)
+(import `java.io.FileInputStream)
+(import `java.io.FileOutputStream)
+(import `java.io.FileWriter)
+(import `java.io.File)
+(import `java.io.OutputStream)
+(import `java.io.OutputStreamWriter)
+(import `java.io.PrintStream)
+(import `java.io.PrintWriter)
+(use 'clojure.test)
+(use 'clojure.test.junit)
+
+(def props (Properties.))
+(.load props (FileInputStream. (first *command-line-args*)))
+
+(def namespaces (into []
+ (for [[key val] props
+ :when (.startsWith key "ns.")]
+ (symbol val))))
+
+(def output-dir (.get props "outputDir"))
+
+(dorun (for [ns namespaces]
+ (require ns)))
+
+(.mkdirs (File. output-dir))
+
+(let [sys-out System/out
+ sys-err System/err
+ num-bad (atom 0)
+ original-junit-report junit-report
+ orig-out *out*]
+ (dorun (for [ns namespaces]
+ (with-open [out-stream (FileOutputStream. (str output-dir "/" ns ".xml"))
+ print-writer (PrintWriter. out-stream true)
+ print-stream (PrintStream. out-stream true)]
+ (.println sys-out (str "Running " ns))
+ (try
+ (let [in-sys-out (atom false)]
+ (binding [*test-out* print-writer
+ *out* orig-out
+ junit-report (fn [data]
+ (let [type (data :type)]
+ (cond
+ (= type :begin-test-var) (do
+ (when @in-sys-out
+ (reset! in-sys-out false)
+ (System/setOut sys-out)
+ (System/setErr sys-err)
+ (set! *out* orig-out)
+ (with-test-out
+ (print "]]>")
+ (finish-element 'system-out true)))
+ (original-junit-report data)
+ (reset! in-sys-out true)
+ (with-test-out
+ (start-element 'system-out true)
+ (print "<![CDATA[") (flush))
+ (System/setOut print-stream)
+ (System/setErr print-stream)
+ (set! *out* print-writer))
+ (= type :end-test-var) (when @in-sys-out
+ (reset! in-sys-out false)
+ (System/setOut sys-out)
+ (System/setErr sys-err)
+ (set! *out* orig-out)
+ (with-test-out
+ (print "]]>")
+ (finish-element 'system-out true)))
+ (= type :fail) (when @in-sys-out
+ (reset! in-sys-out false)
+ (System/setOut sys-out)
+ (System/setErr sys-err)
+ (set! *out* orig-out)
+ (with-test-out
+ (print "]]>")
+ (finish-element 'system-out true)))
+ (= type :error) (when @in-sys-out
+ (reset! in-sys-out false)
+ (System/setOut sys-out)
+ (System/setErr sys-err)
+ (set! *out* orig-out)
+ (with-test-out
+ (print "]]>")
+ (finish-element 'system-out true))))
+ (if (not (= type :begin-test-var)) (original-junit-report data))))]
+ (with-junit-output
+ (let [result (run-tests ns)]
+ (.println sys-out (str "Tests run: " (result :test) ", Passed: " (result :pass) ", Failures: " (result :fail) ", Errors: " (result :error)))
+ (reset! num-bad (+ @num-bad (result :error) (result :fail)))))))
+ (finally
+ (System/setOut sys-out)
+ (System/setErr sys-err))))))
+ (shutdown-agents)
+ (System/exit (if (> @num-bad 0) 1 0)))
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 0014aa5..e9adaf8 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -38,6 +38,12 @@
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
+ <artifactId>storm-clojure</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.storm</groupId>
<artifactId>storm-server</artifactId>
<version>${project.version}</version>
</dependency>
@@ -282,9 +288,6 @@
<groupId>metrics-clojure</groupId>
<artifactId>metrics-clojure</artifactId>
</dependency>
- <!-- hamcrest-core dependency is shaded inside the mockito-all and junit depends on newer version of hamcrest-core.
- To give higher precedence to classes from newer version of hamcrest-core, Junit has been placed above mockito.
- -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@@ -448,7 +451,7 @@
</goals>
<configuration>
<junitOutput>true</junitOutput>
- <testScript>test/resources/test_runner.clj</testScript>
+ <testScript>../storm-clojure/test/resources/test_runner.clj</testScript>
<!-- argLine is set by JaCoCo for code coverage -->
<vmargs>-Xmx1536m ${argLine} ${test.extra.args}</vmargs>
<!-- Run clojure unit tests or all tests (including integration tests) depending on the profile enabled -->
@@ -477,188 +480,6 @@
</configuration>
</plugin>
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <keepDependenciesWithProvidedScope>false</keepDependenciesWithProvidedScope>
- <promoteTransitiveDependencies>false</promoteTransitiveDependencies>
- <createDependencyReducedPom>true</createDependencyReducedPom>
- <minimizeJar>false</minimizeJar>
- <artifactSet>
- <includes>
- <include>ns-tracker:ns-tracker</include>
- <include>hiccup:hiccup</include>
- <include>ring:*</include>
- <include>compojure:compojure</include>
- <include>clj-time:clj-time</include>
- <include>org.clojure:math.numeric-tower</include>
- <include>org.clojure:tools.cli</include>
- <include>org.clojure:tools.logging</include>
- <include>org.clojure:tools.macro</include>
- <include>org.clojure:java.jmx</include>
- <include>clout:clout</include>
- <include>org.clojure:tools.namespace</include>
- <include>cheshire:cheshire</include>
- <include>org.clojure:core.incubator</include>
- <include>metrics-clojure:*</include>
- </includes>
- </artifactSet>
- <relocations>
- <relocation>
- <pattern>cheshire</pattern>
- <shadedPattern>org.apache.storm.shade.cheshire</shadedPattern>
- </relocation>
- <relocation>
- <pattern>clojure.tools.logging</pattern>
- <shadedPattern>org.apache.storm.shade.clojure.tools.logging</shadedPattern>
- </relocation>
- <relocation>
- <pattern>clojure.core.incubator</pattern>
- <shadedPattern>org.apache.storm.shade.clojure.core.incubator</shadedPattern>
- </relocation>
- <relocation>
- <pattern>clojure.tools.namespace</pattern>
- <shadedPattern>org.apache.storm.shade.clojure.tools.namespace</shadedPattern>
- </relocation>
- <relocation>
- <pattern>clout</pattern>
- <shadedPattern>org.apache.storm.shade.clout</shadedPattern>
- </relocation>
- <relocation>
- <pattern>compojure</pattern>
- <shadedPattern>org.apache.storm.shade.compojure</shadedPattern>
- </relocation>
- <relocation>
- <pattern>ns_tracker</pattern>
- <shadedPattern>org.apache.storm.shade.ns_tracker</shadedPattern>
- </relocation>
- <relocation>
- <pattern>ns-tracker</pattern>
- <shadedPattern>org.apache.storm.shade.ns-tracker</shadedPattern>
- </relocation>
- <relocation>
- <pattern>hiccup</pattern>
- <shadedPattern>org.apache.storm.shade.hiccup</shadedPattern>
- </relocation>
- <relocation>
- <pattern>ring</pattern>
- <shadedPattern>org.apache.storm.shade.ring</shadedPattern>
- </relocation>
- <relocation>
- <pattern>clj_time</pattern>
- <shadedPattern>org.apache.storm.shade.clj_time</shadedPattern>
- </relocation>
- <relocation>
- <pattern>clj-time</pattern>
- <shadedPattern>org.apache.storm.shade.clj-time</shadedPattern>
- </relocation>
- <relocation>
- <pattern>clojure.math</pattern>
- <shadedPattern>org.apache.storm.shade.clojure.math</shadedPattern>
- </relocation>
- <relocation>
- <pattern>clojure.tools.cli</pattern>
- <shadedPattern>org.apache.storm.shade.clojure.tools.cli</shadedPattern>
- </relocation>
- <relocation>
- <pattern>cljs.tools.cli</pattern>
- <shadedPattern>org.apache.storm.shade.cljs.tools.cli</shadedPattern>
- </relocation>
- <relocation>
- <pattern>clojure.tools.macro</pattern>
- <shadedPattern>org.apache.storm.shade.clojure.tools.macro</shadedPattern>
- </relocation>
- <relocation>
- <pattern>metrics.core</pattern>
- <shadedPattern>org.apache.storm.shade.metrics.core</shadedPattern>
- </relocation>
- <relocation>
- <pattern>metrics.counters</pattern>
- <shadedPattern>org.apache.storm.shade.metrics.counters</shadedPattern>
- </relocation>
- <relocation>
- <pattern>metrics.gauges</pattern>
- <shadedPattern>org.apache.storm.shade.metrics.gauges</shadedPattern>
- </relocation>
- <relocation>
- <pattern>metrics.histograms</pattern>
- <shadedPattern>org.apache.storm.shade.metrics.histograms</shadedPattern>
- </relocation>
- <relocation>
- <pattern>metrics.meters</pattern>
- <shadedPattern>org.apache.storm.shade.metrics.meters</shadedPattern>
- </relocation>
- <relocation>
- <pattern>metrics.reporters</pattern>
- <shadedPattern>org.apache.storm.shade.metrics.reporters</shadedPattern>
- </relocation>
- <relocation>
- <pattern>metrics.timers</pattern>
- <shadedPattern>org.apache.storm.shade.metrics.timers</shadedPattern>
- </relocation>
- <relocation>
- <pattern>metrics.utils</pattern>
- <shadedPattern>org.apache.storm.shade.metrics.utils</shadedPattern>
- </relocation>
- </relocations>
- <transformers>
- <transformer implementation="org.apache.storm.maven.shade.clojure.ClojureTransformer" />
- </transformers>
- <filters>
- <!-- Several of these filters remove the .clj files from the shaded dependencies, even though only .clj files are in these jars.
- The reason for this is a bit complex, but intentional. During the build process all of the dependency .clj files are
- compiled down into .class files, and included in storm-core.jar. The regular shade transformer handles these in
- the majority of cases correctly. However, the Clojure-Transformer does not shade everything correctly all the
- time. Instead of spending a lot of time to get the Clojure-Transformer to parse Clojure correctly we opted to remove
- the .clj files from the uber jar. -->
- <filter><artifact>metrics-clojure:*</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
- <filter><artifact>org.clojure:core.incubator</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
- <filter><artifact>cheshire:cheshire</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
- <filter><artifact>org.clojure:tools.logging</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
- <filter><artifact>org.clojure:tools.namespace</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
- <filter><artifact>org.clojure:math.numeric-tower</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
- <filter><artifact>org.clojure:tools.macro</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
- <filter><artifact>org.clojure:tools.cli</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
- <filter><artifact>ns-tracker:ns-tracker</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
- <filter><artifact>clout:clout</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
- <filter><artifact>hiccup:hiccup</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
- <filter><artifact>clj-time:clj-time</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
- <filter><artifact>ring:*</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
- <filter><artifact>compojure:compojure</artifact><excludes><exclude>**/*.clj</exclude></excludes></filter>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.sf</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.dsa</exclude>
- <exclude>META-INF/*.RSA</exclude>
- <exclude>META-INF/*.rsa</exclude>
- <exclude>META-INF/*.EC</exclude>
- <exclude>META-INF/*.ec</exclude>
- <exclude>META-INF/MSFTSIG.SF</exclude>
- <exclude>META-INF/MSFTSIG.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- </configuration>
- <dependencies>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>maven-shade-clojure-transformer</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
- </plugin>
- <plugin>
<groupId>org.apache.storm</groupId>
<artifactId>storm-maven-plugins</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/src/clj/org/apache/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/config.clj b/storm-core/src/clj/org/apache/storm/config.clj
deleted file mode 100644
index bf7cd12..0000000
--- a/storm-core/src/clj/org/apache/storm/config.clj
+++ /dev/null
@@ -1,37 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.config
- (:import [org.apache.storm DaemonConfig Config])
- (:import [org.apache.storm.validation ConfigValidation]))
-
-(defn- clojure-config-name [name]
- (.replace (.toUpperCase name) "_" "-"))
-
-; define clojure constants for every configuration parameter
-(doseq [f (seq (.getDeclaredFields DaemonConfig))]
- (when (ConfigValidation/isFieldAllowed f)
- (let [name (.getName f)
- new-name (clojure-config-name name)]
- (eval
- `(def ~(symbol new-name) (. DaemonConfig ~(symbol name)))))))
-
-(doseq [f (seq (.getDeclaredFields Config))]
- (when (ConfigValidation/isFieldAllowed f)
- (let [name (.getName f)
- new-name (clojure-config-name name)]
- (eval
- `(def ~(symbol new-name) (. Config ~(symbol name)))))))
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
index 05c887b..8720fca 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
@@ -18,7 +18,7 @@
(:use [clojure.set :only [difference intersection]])
(:use [clojure.string :only [blank? split]])
(:use [hiccup core page-helpers form-helpers])
- (:use [org.apache.storm config util log])
+ (:use [org.apache.storm config daemon-config util log])
(:use [org.apache.storm.ui helpers])
(:import [org.apache.storm StormTimer]
[org.apache.storm.daemon.supervisor SupervisorUtils]
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/src/clj/org/apache/storm/daemon_config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon_config.clj b/storm-core/src/clj/org/apache/storm/daemon_config.clj
new file mode 100644
index 0000000..9d74c4f
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/daemon_config.clj
@@ -0,0 +1,30 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.daemon-config
+ (:import [org.apache.storm DaemonConfig])
+ (:import [org.apache.storm.validation ConfigValidation]))
+
+(defn- clojure-config-name [name]
+ (.replace (.toUpperCase name) "_" "-"))
+
+; define clojure constants for every configuration parameter
+(doseq [f (seq (.getDeclaredFields DaemonConfig))]
+ (when (ConfigValidation/isFieldAllowed f)
+ (let [name (.getName f)
+ new-name (clojure-config-name name)]
+ (eval
+ `(def ~(symbol new-name) (. DaemonConfig ~(symbol name)))))))
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/src/clj/org/apache/storm/internal/clojure.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/internal/clojure.clj b/storm-core/src/clj/org/apache/storm/internal/clojure.clj
deleted file mode 100644
index f27ac04..0000000
--- a/storm-core/src/clj/org/apache/storm/internal/clojure.clj
+++ /dev/null
@@ -1,204 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.internal.clojure
- (:use [org.apache.storm util])
- (:import [org.apache.storm StormSubmitter])
- (:import [org.apache.storm.generated StreamInfo])
- (:import [org.apache.storm.tuple Tuple])
- (:import [org.apache.storm.task OutputCollector IBolt TopologyContext])
- (:import [org.apache.storm.spout SpoutOutputCollector ISpout])
- (:import [org.apache.storm.utils Utils])
- (:import [org.apache.storm.clojure ClojureBolt ClojureSpout])
- (:import [java.util Collection List])
- (:require [org.apache.storm.internal [thrift :as thrift]]))
-
-(defn direct-stream [fields]
- (StreamInfo. fields true))
-
-(defn to-spec [avar]
- (let [m (meta avar)]
- [(str (:ns m)) (str (:name m))]))
-
-(defn clojure-bolt* [output-spec fn-var conf-fn-var args]
- (ClojureBolt. (to-spec fn-var) (to-spec conf-fn-var) args (thrift/mk-output-spec output-spec)))
-
-(defmacro clojure-bolt [output-spec fn-sym conf-fn-sym args]
- `(clojure-bolt* ~output-spec (var ~fn-sym) (var ~conf-fn-sym) ~args))
-
-(defn clojure-spout* [output-spec fn-var conf-var args]
- (let [m (meta fn-var)]
- (ClojureSpout. (to-spec fn-var) (to-spec conf-var) args (thrift/mk-output-spec output-spec))
- ))
-
-(defmacro clojure-spout [output-spec fn-sym conf-sym args]
- `(clojure-spout* ~output-spec (var ~fn-sym) (var ~conf-sym) ~args))
-
-(defn normalize-fns [body]
- (for [[name args & impl] body
- :let [args (-> "this"
- gensym
- (cons args)
- vec)]]
- (concat [name args] impl)
- ))
-
-(defmacro bolt [& body]
- (let [[bolt-fns other-fns] (split-with #(not (symbol? %)) body)
- fns (normalize-fns bolt-fns)]
- `(reify IBolt
- ~@fns
- ~@other-fns)))
-
-(defmacro bolt-execute [& body]
- `(bolt
- (~'execute ~@body)))
-
-(defmacro spout [& body]
- (let [[spout-fns other-fns] (split-with #(not (symbol? %)) body)
- fns (normalize-fns spout-fns)]
- `(reify ISpout
- ~@fns
- ~@other-fns)))
-
-(defmacro defbolt [name output-spec & [opts & impl :as all]]
- (if-not (map? opts)
- `(defbolt ~name ~output-spec {} ~@all)
- (let [worker-name (symbol (str name "__"))
- conf-fn-name (symbol (str name "__conf__"))
- params (:params opts)
- conf-code (:conf opts)
- fn-body (if (:prepare opts)
- (cons 'fn impl)
- (let [[args & impl-body] impl
- coll-sym (nth args 1)
- args (vec (take 1 args))
- prepargs [(gensym "conf") (gensym "context") coll-sym]]
- `(fn ~prepargs (bolt (~'execute ~args ~@impl-body)))))
- definer (if params
- `(defn ~name [& args#]
- (clojure-bolt ~output-spec ~worker-name ~conf-fn-name args#))
- `(def ~name
- (clojure-bolt ~output-spec ~worker-name ~conf-fn-name []))
- )
- ]
- `(do
- (defn ~conf-fn-name ~(if params params [])
- ~conf-code
- )
- (defn ~worker-name ~(if params params [])
- ~fn-body
- )
- ~definer
- ))))
-
-(defmacro defspout [name output-spec & [opts & impl :as all]]
- (if-not (map? opts)
- `(defspout ~name ~output-spec {} ~@all)
- (let [worker-name (symbol (str name "__"))
- conf-fn-name (symbol (str name "__conf__"))
- params (:params opts)
- conf-code (:conf opts)
- prepare? (:prepare opts)
- prepare? (if (nil? prepare?) true prepare?)
- fn-body (if prepare?
- (cons 'fn impl)
- (let [[args & impl-body] impl
- coll-sym (first args)
- prepargs [(gensym "conf") (gensym "context") coll-sym]]
- `(fn ~prepargs (spout (~'nextTuple [] ~@impl-body)))))
- definer (if params
- `(defn ~name [& args#]
- (clojure-spout ~output-spec ~worker-name ~conf-fn-name args#))
- `(def ~name
- (clojure-spout ~output-spec ~worker-name ~conf-fn-name []))
- )
- ]
- `(do
- (defn ~conf-fn-name ~(if params params [])
- ~conf-code
- )
- (defn ~worker-name ~(if params params [])
- ~fn-body
- )
- ~definer
- ))))
-
-(defprotocol TupleValues
- (tuple-values [values collector stream]))
-
-(extend-protocol TupleValues
- java.util.Map
- (tuple-values [this collector ^String stream]
- (let [^TopologyContext context (:context collector)
- fields (.. context (getThisOutputFields stream) toList) ]
- (vec (map (into
- (empty this) (for [[k v] this]
- [(if (keyword? k) (name k) k) v]))
- fields))))
- java.util.List
- (tuple-values [this collector stream]
- this))
-
-(defn- collectify
- [obj]
- (if (or (sequential? obj) (instance? Collection obj))
- obj
- [obj]))
-
-(defnk emit-bolt! [collector values
- :stream Utils/DEFAULT_STREAM_ID :anchor []]
- (let [^List anchor (collectify anchor)
- values (tuple-values values collector stream) ]
- (.emit ^OutputCollector (:output-collector collector) stream anchor values)
- ))
-
-(defnk emit-direct-bolt! [collector task values
- :stream Utils/DEFAULT_STREAM_ID :anchor []]
- (let [^List anchor (collectify anchor)
- values (tuple-values values collector stream) ]
- (.emitDirect ^OutputCollector (:output-collector collector) task stream anchor values)
- ))
-
-(defn ack! [collector ^Tuple tuple]
- (.ack ^OutputCollector (:output-collector collector) tuple))
-
-(defn fail! [collector ^Tuple tuple]
- (.fail ^OutputCollector (:output-collector collector) tuple))
-
-(defn reset-timeout! [collector ^Tuple tuple]
- (.resetTimeout ^OutputCollector (:output-collector collector) tuple))
-
-(defn report-error! [collector ^Tuple tuple]
- (.reportError ^OutputCollector (:output-collector collector) tuple))
-
-(defnk emit-spout! [collector values
- :stream Utils/DEFAULT_STREAM_ID :id nil]
- (let [values (tuple-values values collector stream)]
- (.emit ^SpoutOutputCollector (:output-collector collector) stream values id)))
-
-(defnk emit-direct-spout! [collector task values
- :stream Utils/DEFAULT_STREAM_ID :id nil]
- (let [values (tuple-values values collector stream)]
- (.emitDirect ^SpoutOutputCollector (:output-collector collector) task stream values id)))
-
-(defn submit-remote-topology [name conf topology]
- (StormSubmitter/submitTopology name conf topology))
-
-(defn local-cluster []
- ;; do this to avoid a cyclic dependency of
- ;; LocalCluster -> testing -> nimbus -> bootstrap -> clojure -> LocalCluster
- (eval '(new org.apache.storm.LocalCluster)))
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/src/clj/org/apache/storm/log.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/log.clj b/storm-core/src/clj/org/apache/storm/log.clj
deleted file mode 100644
index 7a006ef..0000000
--- a/storm-core/src/clj/org/apache/storm/log.clj
+++ /dev/null
@@ -1,34 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.log
- (:require [clojure.tools.logging :as log]))
-
-(defmacro log-message
- [& args]
- `(log/info (str ~@args)))
-
-(defmacro log-error
- [e & args]
- `(log/log :error ~e (str ~@args)))
-
-(defmacro log-debug
- [& args]
- `(log/debug (str ~@args)))
-
-(defmacro log-warn
- [& args]
- `(log/warn (str ~@args)))
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj
index 02ee449..2a07fdc 100644
--- a/storm-core/src/clj/org/apache/storm/ui/core.clj
+++ b/storm-core/src/clj/org/apache/storm/ui/core.clj
@@ -24,7 +24,7 @@
ring.middleware.multipart-params.temp-file)
(:use [ring.middleware.json :only [wrap-json-params]])
(:use [hiccup core page-helpers])
- (:use [org.apache.storm config util log])
+ (:use [org.apache.storm config daemon-config util log])
(:use [org.apache.storm.ui helpers])
(:import [org.apache.storm.utils Time]
[org.apache.storm.generated NimbusSummary]
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/src/clj/org/apache/storm/util.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/util.clj b/storm-core/src/clj/org/apache/storm/util.clj
deleted file mode 100644
index 9ad1f10..0000000
--- a/storm-core/src/clj/org/apache/storm/util.clj
+++ /dev/null
@@ -1,134 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.util
- (:import [java.util Map List HashMap])
- (:import [org.apache.storm.generated ErrorInfo])
- (:import [org.apache.storm.utils Utils])
- (:import [java.util List Set])
- (:use [clojure walk])
- (:use [org.apache.storm log]))
-
-;; name-with-attributes by Konrad Hinsen:
-(defn name-with-attributes
- "To be used in macro definitions.
- Handles optional docstrings and attribute maps for a name to be defined
- in a list of macro arguments. If the first macro argument is a string,
- it is added as a docstring to name and removed from the macro argument
- list. If afterwards the first macro argument is a map, its entries are
- added to the name's metadata map and the map is removed from the
- macro argument list. The return value is a vector containing the name
- with its extended metadata map and the list of unprocessed macro
- arguments."
- [name macro-args]
- (let [[docstring macro-args] (if (string? (first macro-args))
- [(first macro-args) (next macro-args)]
- [nil macro-args])
- [attr macro-args] (if (map? (first macro-args))
- [(first macro-args) (next macro-args)]
- [{} macro-args])
- attr (if docstring
- (assoc attr :doc docstring)
- attr)
- attr (if (meta name)
- (conj (meta name) attr)
- attr)]
- [(with-meta name attr) macro-args]))
-
-(defmacro defnk
- "Define a function accepting keyword arguments. Symbols up to the first
- keyword in the parameter list are taken as positional arguments. Then
- an alternating sequence of keywords and defaults values is expected. The
- values of the keyword arguments are available in the function body by
- virtue of the symbol corresponding to the keyword (cf. :keys destructuring).
- defnk accepts an optional docstring as well as an optional metadata map."
- [fn-name & fn-tail]
- (let [[fn-name [args & body]] (name-with-attributes fn-name fn-tail)
- [pos kw-vals] (split-with symbol? args)
- syms (map #(-> % name symbol) (take-nth 2 kw-vals))
- values (take-nth 2 (rest kw-vals))
- sym-vals (apply hash-map (interleave syms values))
- de-map {:keys (vec syms) :or sym-vals}]
- `(defn ~fn-name
- [~@pos & options#]
- (let [~de-map (apply hash-map options#)]
- ~@body))))
-
-(defmacro thrown-cause?
- [klass & body]
- `(try
- ~@body
- false
- (catch Throwable t#
- (let [tc# (Utils/exceptionCauseIsInstanceOf ~klass t#)]
- (if (not tc#) (log-error t# "Exception did not match " ~klass))
- tc#))))
-
-(defn clojurify-structure
- [s]
- (if s
- (prewalk (fn [x]
- (cond (instance? Map x) (into {} x)
- (instance? List x) (vec x)
- (instance? Set x) (into #{} x)
- ;; (Boolean. false) does not evaluate to false in an if.
- ;; This fixes that.
- (instance? Boolean x) (boolean x)
- true x))
- s)))
-; move this func form convert.clj due to cyclic load dependency
-(defn clojurify-error [^ErrorInfo error]
- (if error
- {
- :error (.get_error error)
- :time-secs (.get_error_time_secs error)
- :host (.get_host error)
- :port (.get_port error)
- }
- ))
-
-;TODO: We're keeping this function around until all the code using it is properly tranlated to java
-;TODO: by properly having the for loop IN THE JAVA FUNCTION that originally used this function.
-(defn map-val
- [afn amap]
- (into {}
- (for [[k v] amap]
- [k (afn v)])))
-
-;TODO: We're keeping this function around until all the code using it is properly tranlated to java
-;TODO: by properly having the for loop IN THE JAVA FUNCTION that originally used this function.
-(defn filter-key
- [afn amap]
- (into {} (filter (fn [[k v]] (afn k)) amap)))
-
-;TODO: Once all the other clojure functions (100+ locations) are translated to java, this function becomes moot.
-(def not-nil? (complement nil?))
-
-(defmacro dofor [& body]
- `(doall (for ~@body)))
-
-(defmacro -<>
- ([x] x)
- ([x form] (if (seq? form)
- (with-meta
- (let [[begin [_ & end]] (split-with #(not= % '<>) form)]
- (concat begin [x] end))
- (meta form))
- (list form x)))
- ([x form & more] `(-<> (-<> ~x ~form) ~@more)))
-
-(defn hashmap-to-persistent [^HashMap m]
- (zipmap (.keySet m) (.values m)))
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
index 7342070..c5346d7 100644
--- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
@@ -22,7 +22,7 @@
TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout])
(:import [org.apache.storm.utils Time])
(:import [org.apache.storm.tuple Fields])
- (:use [org.apache.storm.internal clojure])
+ (:use [org.apache.storm clojure])
(:use [org.apache.storm config util])
(:import [org.apache.storm Thrift])
(:import [org.apache.storm.utils Utils]))
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
index 3c6c58a..87e1fc0 100644
--- a/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/testing4j_test.clj
@@ -15,8 +15,8 @@
;; limitations under the License.
(ns integration.org.apache.storm.testing4j-test
(:use [clojure.test])
- (:use [org.apache.storm config util])
- (:use [org.apache.storm.internal clojure])
+ (:use [org.apache.storm daemon-config config util])
+ (:use [org.apache.storm clojure])
(:require [integration.org.apache.storm.integration-test :as it])
(:require [org.apache.storm.internal.thrift :as thrift])
(:import [org.apache.storm Testing Config]
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/clj/org/apache/storm/drpc_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/drpc_test.clj b/storm-core/test/clj/org/apache/storm/drpc_test.clj
index b0cd62b..67e18c1 100644
--- a/storm-core/test/clj/org/apache/storm/drpc_test.clj
+++ b/storm-core/test/clj/org/apache/storm/drpc_test.clj
@@ -30,7 +30,7 @@
(:import [org.apache.storm Thrift])
(:import [org.mockito ArgumentCaptor Mockito Matchers])
(:use [org.apache.storm config])
- (:use [org.apache.storm.internal clojure])
+ (:use [org.apache.storm clojure])
(:use [conjure core]))
(defbolt exclamation-bolt ["result" "return-info"] [tuple collector]
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/clj/org/apache/storm/grouping_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/grouping_test.clj b/storm-core/test/clj/org/apache/storm/grouping_test.clj
index f138e75..9bfb8a7 100644
--- a/storm-core/test/clj/org/apache/storm/grouping_test.clj
+++ b/storm-core/test/clj/org/apache/storm/grouping_test.clj
@@ -19,7 +19,7 @@
[org.apache.storm.generated JavaObject JavaObjectArg Grouping NullStruct])
(:import [org.apache.storm.grouping LoadMapping])
(:use [org.apache.storm log config])
- (:use [org.apache.storm.internal clojure])
+ (:use [org.apache.storm clojure])
(:import [org.apache.storm LocalCluster$Builder Testing Thrift])
(:import [org.apache.storm.utils Utils]
(org.apache.storm.daemon GrouperFactory)))
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/clj/org/apache/storm/logviewer_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/logviewer_test.clj b/storm-core/test/clj/org/apache/storm/logviewer_test.clj
index 4410af0..ae9b651 100644
--- a/storm-core/test/clj/org/apache/storm/logviewer_test.clj
+++ b/storm-core/test/clj/org/apache/storm/logviewer_test.clj
@@ -14,7 +14,7 @@
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.logviewer-test
- (:use [org.apache.storm config util])
+ (:use [org.apache.storm daemon-config config util])
(:require [org.apache.storm.daemon [logviewer :as logviewer]])
(:require [conjure.core])
(:use [clojure test])
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/clj/org/apache/storm/metrics_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/metrics_test.clj b/storm-core/test/clj/org/apache/storm/metrics_test.clj
index 6903cc1..4221d0b 100644
--- a/storm-core/test/clj/org/apache/storm/metrics_test.clj
+++ b/storm-core/test/clj/org/apache/storm/metrics_test.clj
@@ -26,7 +26,7 @@
(:import [org.apache.storm Testing Testing$Condition LocalCluster$Builder])
(:use [org.apache.storm config])
- (:use [org.apache.storm.internal clojure])
+ (:use [org.apache.storm clojure])
(:use [org.apache.storm.util])
(:import [org.apache.storm Thrift])
(:import [org.apache.storm.utils Utils]
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 39c675e..e91a9b0 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -47,7 +47,7 @@
(:import [org.json.simple JSONValue])
(:import [org.apache.storm.daemon StormCommon])
(:import [org.apache.storm.cluster IStormClusterState StormClusterStateImpl ClusterStateContext ClusterUtils])
- (:use [org.apache.storm util config log])
+ (:use [org.apache.storm util daemon-config config log])
(:require [conjure.core])
(:use [conjure core]))
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj b/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
index 1c3d94a..339d5b3 100644
--- a/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
+++ b/storm-core/test/clj/org/apache/storm/scheduler/multitenant_scheduler_test.clj
@@ -15,7 +15,7 @@
;; limitations under the License.
(ns org.apache.storm.scheduler.multitenant-scheduler-test
(:use [clojure test])
- (:use [org.apache.storm config log])
+ (:use [org.apache.storm daemon-config config log])
(:import [org.apache.storm.generated StormTopology])
(:import [org.apache.storm.daemon.nimbus Nimbus$StandaloneINimbus])
(:import [org.apache.storm.scheduler Cluster SupervisorDetails WorkerSlot ExecutorDetails
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
index fc95097..9133fd9 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
@@ -38,7 +38,7 @@
(:import [org.apache.storm.security.auth AuthUtils ThriftServer ThriftClient ShellBasedGroupsMapping
ReqContext SimpleTransportPlugin KerberosPrincipalToLocal ThriftConnectionType])
(:import [org.apache.storm.daemon StormCommon])
- (:use [org.apache.storm util config])
+ (:use [org.apache.storm util daemon-config config])
(:import [org.apache.storm.generated Nimbus Nimbus$Client Nimbus$Iface StormTopology SubmitOptions
KillOptions RebalanceOptions ClusterSummary TopologyInfo Nimbus$Processor]
(org.json.simple JSONValue))
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
index ecdc337..8476e7e 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
@@ -29,7 +29,7 @@
(:import [org.apache.storm.utils ConfigUtils Utils])
(:import [org.apache.storm.cluster IStormClusterState])
(:import [org.mockito Mockito Matchers])
- (:use [org.apache.storm util config log])
+ (:use [org.apache.storm util config daemon-config log])
(:require [conjure.core])
(:use [conjure core]))
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/clj/org/apache/storm/transactional_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/transactional_test.clj b/storm-core/test/clj/org/apache/storm/transactional_test.clj
index 358a4ba..e84efbc 100644
--- a/storm-core/test/clj/org/apache/storm/transactional_test.clj
+++ b/storm-core/test/clj/org/apache/storm/transactional_test.clj
@@ -37,7 +37,7 @@
(:import [org.mockito.exceptions.base MockitoAssertionError])
(:import [java.util HashMap Collections ArrayList])
(:use [org.apache.storm util config log])
- (:use [org.apache.storm.internal clojure]))
+ (:use [org.apache.storm clojure]))
;; Testing TODO:
;; * Test that it repeats the meta for a partitioned state (test partitioned emitter on its own)
http://git-wip-us.apache.org/repos/asf/storm/blob/a4cf917d/storm-core/test/clj/org/apache/storm/tuple_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/tuple_test.clj b/storm-core/test/clj/org/apache/storm/tuple_test.clj
deleted file mode 100644
index 5d5dea7..0000000
--- a/storm-core/test/clj/org/apache/storm/tuple_test.clj
+++ /dev/null
@@ -1,52 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.tuple-test
- (:use [clojure test])
- (:import [org.apache.storm Testing])
- (:import [org.apache.storm.testing MkTupleParam])
- (:import [org.apache.storm.tuple Tuple]))
-
-(deftest test-lookup
- (let [ tuple (Testing/testTuple [12 "hello"] (doto (MkTupleParam. ) (.setFieldsList ["foo" "bar"]))) ]
- (is (= 12 (tuple "foo")))
- (is (= 12 (tuple :foo)))
- (is (= 12 (:foo tuple)))
-
- (is (= "hello" (:bar tuple)))
-
- (is (= :notfound (tuple "404" :notfound)))))
-
-(deftest test-indexed
- (let [ tuple (Testing/testTuple [12 "hello"] (doto (MkTupleParam. ) (.setFieldsList ["foo" "bar"]))) ]
- (is (= 12 (nth tuple 0)))
- (is (= "hello" (nth tuple 1)))))
-
-(deftest test-seq
- (let [ tuple (Testing/testTuple [12 "hello"] (doto (MkTupleParam. ) (.setFieldsList ["foo" "bar"]))) ]
- (is (= [["foo" 12] ["bar" "hello"]] (seq tuple)))))
-
-(deftest test-map
- (let [tuple (Testing/testTuple [12 "hello"] (doto (MkTupleParam. ) (.setFieldsList ["foo" "bar"]))) ]
- (is (= {"foo" 42 "bar" "hello"} (.getMap (assoc tuple "foo" 42))))
- (is (= {"foo" 42 "bar" "hello"} (.getMap (assoc tuple :foo 42))))
-
- (is (= {"bar" "hello"} (.getMap (dissoc tuple "foo"))))
- (is (= {"bar" "hello"} (.getMap (dissoc tuple :foo))))
-
- (is (= {"foo" 42 "bar" "world"} (.getMap (assoc
- (assoc tuple "foo" 42)
- :bar "world"))))))
-