You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/05/18 14:08:38 UTC

[11/13] storm git commit: STORM-2477: Result of transforming the code

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
index e319a55..702a790 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
@@ -56,7 +56,7 @@ public class HBaseWindowsStore implements WindowsStore {
     private final byte[] family;
     private final byte[] qualifier;
 
-    public HBaseWindowsStore(final Map stormConf, final Configuration config, final String tableName, byte[] family, byte[] qualifier) {
+    public HBaseWindowsStore(final Map<String, Object> topoConf, final Configuration config, final String tableName, byte[] family, byte[] qualifier) {
         this.family = family;
         this.qualifier = qualifier;
 
@@ -76,7 +76,7 @@ public class HBaseWindowsStore implements WindowsStore {
         threadLocalWindowKryoSerializer = new ThreadLocal<WindowKryoSerializer>(){
             @Override
             protected WindowKryoSerializer initialValue() {
-                return new WindowKryoSerializer(stormConf);
+                return new WindowKryoSerializer(topoConf);
             }
         };
 
@@ -268,4 +268,4 @@ public class HBaseWindowsStore implements WindowsStore {
         }
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
index a455924..f0c8805 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
@@ -42,14 +42,14 @@ public class HBaseWindowsStoreFactory implements WindowsStoreFactory {
         this.qualifier = qualifier;
     }
 
-    public WindowsStore create(Map stormConf) {
+    public WindowsStore create(Map<String, Object> topoConf) {
         Configuration configuration = HBaseConfiguration.create();
         for (Map.Entry<String, Object> entry : config.entrySet()) {
             if (entry.getValue() != null) {
                 configuration.set(entry.getKey(), entry.getValue().toString());
             }
         }
-        return new HBaseWindowsStore(stormConf, configuration, tableName, family, qualifier);
+        return new HBaseWindowsStore(topoConf, configuration, tableName, family, qualifier);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
index 2008a3e..bb03a11 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
@@ -50,11 +50,11 @@ public class ConfluentAvroSerializer extends AbstractAvroSerializer {
      * See Storm's SerializationFactory class for details
      *
      * @param k Unused but needs to be present for Serialization Factory to find this constructor
-     * @param stormConf The global storm configuration. Must define "avro.schemaregistry.confluent" to locate the
+     * @param topoConf The global storm configuration. Must define "avro.schemaregistry.confluent" to locate the
      *                  confluent schema registry. Should in the form of "http://HOST:PORT"
      */
-    public ConfluentAvroSerializer(Kryo k, Map stormConf) {
-        url = (String) stormConf.get("avro.schemaregistry.confluent");
+    public ConfluentAvroSerializer(Kryo k, Map<String, Object> topoConf) {
+        url = (String) topoConf.get("avro.schemaregistry.confluent");
         this.theClient = new CachedSchemaRegistryClient(this.url, 10000);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/GenericAvroSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/GenericAvroSerializer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/GenericAvroSerializer.java
index ecf8c49..fedf698 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/GenericAvroSerializer.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/GenericAvroSerializer.java
@@ -33,4 +33,4 @@ public class GenericAvroSerializer extends AbstractAvroSerializer {
     public Schema getSchema(String fingerPrint) {
         return new Schema.Parser().parse(fingerPrint);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
index dadabe8..a0aa8dc 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
@@ -76,7 +76,7 @@ public class HdfsBlobStore extends BlobStore {
     private BlobStoreAclHandler _aclHandler;
     private HdfsBlobStoreImpl _hbs;
     private Subject _localSubject;
-    private Map conf;
+    private Map<String, Object> conf;
 
     /**
      * Get the subject from Hadoop so we can use it to validate the acls. There is no direct
@@ -112,7 +112,7 @@ public class HdfsBlobStore extends BlobStore {
     }
 
     @Override
-    public void prepare(Map conf, String overrideBase, NimbusInfo nimbusInfo) {
+    public void prepare(Map<String, Object> conf, String overrideBase, NimbusInfo nimbusInfo) {
         this.conf = conf;
         prepareInternal(conf, overrideBase, null);
     }
@@ -121,7 +121,7 @@ public class HdfsBlobStore extends BlobStore {
      * Allow a Hadoop Configuration to be passed for testing. If it's null then the hadoop configs
      * must be in your classpath.
      */
-    protected void prepareInternal(Map conf, String overrideBase, Configuration hadoopConf) {
+    protected void prepareInternal(Map<String, Object> conf, String overrideBase, Configuration hadoopConf) {
         this.conf = conf;
         if (overrideBase == null) {
             overrideBase = (String)conf.get(Config.BLOBSTORE_DIR);

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
index d88211c..389dc71 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/blobstore/HdfsClientBlobStore.java
@@ -48,7 +48,7 @@ public class HdfsClientBlobStore extends ClientBlobStore {
     private NimbusClient client;
 
     @Override
-    public void prepare(Map conf) {
+    public void prepare(Map<String, Object> conf) {
         this._conf = conf;
         _blobStore = new HdfsBlobStore();
         _blobStore.prepare(conf, null, null);
@@ -105,7 +105,7 @@ public class HdfsClientBlobStore extends ClientBlobStore {
     }
 
     @Override
-    public boolean setClient(Map conf, NimbusClient client) {
+    public boolean setClient(Map<String, Object> conf, NimbusClient client) {
         this.client = client;
         return true;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
index 395cced..6de2398 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
@@ -99,7 +99,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
      * @param topologyContext
      * @param collector
      */
-    public final void prepare(Map conf, TopologyContext topologyContext, OutputCollector collector){
+    public final void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector){
         this.writeLock = new Object();
         if (this.syncPolicy == null) throw new IllegalStateException("SyncPolicy must be specified.");
         if (this.rotationPolicy == null) throw new IllegalStateException("RotationPolicy must be specified.");
@@ -302,7 +302,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
                 this.fileNameFormat.getName(rotation, System.currentTimeMillis()));
     }
 
-    abstract protected void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException;
+    abstract protected void doPrepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) throws IOException;
 
     abstract protected String getWriterKey(Tuple tuple);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java
index e173d2a..9ab0e12 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java
@@ -88,7 +88,7 @@ public class AvroGenericRecordBolt extends AbstractHdfsBolt{
     }
 
     @Override
-    protected void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
+    protected void doPrepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
         LOG.info("Preparing AvroGenericRecord Bolt...");
         this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
index 614de6b..ba8b24e 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
@@ -101,7 +101,7 @@ public class HdfsBolt extends AbstractHdfsBolt{
     }
 
     @Override
-    public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
+    public void doPrepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
         LOG.info("Preparing HDFS Bolt...");
         this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
index 3c78075..b2120dc 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
@@ -118,7 +118,7 @@ public class SequenceFileBolt extends AbstractHdfsBolt {
     }
 
     @Override
-    public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
+    public void doPrepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
         LOG.info("Preparing Sequence File Bolt...");
         if (this.format == null) throw new IllegalStateException("SequenceFormat must be specified.");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java
index 3bc9904..a3afa18 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java
@@ -70,7 +70,7 @@ public class DefaultFileNameFormat implements FileNameFormat {
     }
 
     @Override
-    public void prepare(Map conf, TopologyContext topologyContext) {
+    public void prepare(Map<String, Object> conf, TopologyContext topologyContext) {
         this.componentId = topologyContext.getThisComponentId();
         this.taskId = topologyContext.getThisTaskId();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/FileNameFormat.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/FileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/FileNameFormat.java
index 90e99cb..9ef2851 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/FileNameFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/FileNameFormat.java
@@ -28,7 +28,7 @@ import java.util.Map;
  */
 public interface FileNameFormat extends Serializable {
 
-    void prepare(Map conf, TopologyContext topologyContext);
+    void prepare(Map<String, Object> conf, TopologyContext topologyContext);
 
     /**
      * Returns the filename the HdfsBolt will create.

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SimpleFileNameFormat.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SimpleFileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SimpleFileNameFormat.java
index 43273f6..cb37bdc 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SimpleFileNameFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SimpleFileNameFormat.java
@@ -56,7 +56,7 @@ public class SimpleFileNameFormat implements FileNameFormat {
 
     @SuppressWarnings("unchecked")
     @Override
-    public void prepare(Map conf, TopologyContext topologyContext) {
+    public void prepare(Map<String, Object> conf, TopologyContext topologyContext) {
         this.componentId = topologyContext.getThisComponentId();
         this.taskId = topologyContext.getThisTaskId();
         try {

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java
index ad48779..78296b9 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java
@@ -33,4 +33,4 @@ interface FileOffset extends Comparable<FileOffset>, Cloneable {
   /** tests if rhs == currOffset+1 */
   boolean isNextOffset(FileOffset rhs);
   FileOffset clone();
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index b956326..43face6 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -89,7 +89,7 @@ public class HdfsSpout extends BaseRichSpout {
 
   private Configuration hdfsConfig;
 
-  private Map conf = null;
+  private Map<String, Object> conf = null;
   private FileLock lock;
   private String spoutId = null;
 
@@ -365,7 +365,7 @@ public class HdfsSpout extends BaseRichSpout {
   }
 
   @SuppressWarnings("deprecation")
-public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
     LOG.info("Opening HDFS Spout");
     this.conf = conf;
     this.commitTimer = new Timer();

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
index 7ed8639..64b6b7a 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
@@ -49,7 +49,7 @@ public class SequenceFileReader<Key extends Writable,Value extends Writable>
   private final Value value;
 
 
-  public SequenceFileReader(FileSystem fs, Path file, Map conf)
+  public SequenceFileReader(FileSystem fs, Path file, Map<String, Object> conf)
           throws IOException {
     super(fs, file);
     int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() );
@@ -59,7 +59,7 @@ public class SequenceFileReader<Key extends Writable,Value extends Writable>
     this.offset = new SequenceFileReader.Offset(0,0,0);
   }
 
-  public SequenceFileReader(FileSystem fs, Path file, Map conf, String offset)
+  public SequenceFileReader(FileSystem fs, Path file, Map<String, Object> conf, String offset)
           throws IOException {
     super(fs, file);
     int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() );

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
index 6da9860..a393238 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
@@ -43,15 +43,15 @@ public class TextFileReader extends AbstractFileReader {
   private final Logger LOG = LoggerFactory.getLogger(TextFileReader.class);
   private TextFileReader.Offset offset;
 
-  public TextFileReader(FileSystem fs, Path file, Map conf) throws IOException {
+  public TextFileReader(FileSystem fs, Path file, Map<String, Object> conf) throws IOException {
     this(fs, file, conf, new TextFileReader.Offset(0,0) );
   }
 
-  public TextFileReader(FileSystem fs, Path file, Map conf, String startOffset) throws IOException {
+  public TextFileReader(FileSystem fs, Path file, Map<String, Object> conf, String startOffset) throws IOException {
     this(fs, file, conf, new TextFileReader.Offset(startOffset) );
   }
 
-  private TextFileReader(FileSystem fs, Path file, Map conf, TextFileReader.Offset startOffset)
+  private TextFileReader(FileSystem fs, Path file, Map<String, Object> conf, TextFileReader.Offset startOffset)
           throws IOException {
     super(fs, file);
     offset = startOffset;

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index a863643..07968f2 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -76,7 +76,7 @@ public class HdfsState implements State {
 
         abstract void execute(List<TridentTuple> tuples) throws IOException;
 
-        abstract void doPrepare(Map conf, int partitionIndex, int numPartitions) throws IOException;
+        abstract void doPrepare(Map<String, Object> conf, int partitionIndex, int numPartitions) throws IOException;
 
         abstract long getCurrentOffset() throws  IOException;
 
@@ -106,7 +106,7 @@ public class HdfsState implements State {
         }
 
 
-        void prepare(Map conf, int partitionIndex, int numPartitions) {
+        void prepare(Map<String, Object> conf, int partitionIndex, int numPartitions) {
             if (this.rotationPolicy == null) {
                 throw new IllegalStateException("RotationPolicy must be specified.");
             } else if (this.rotationPolicy instanceof FileSizeRotationPolicy) {
@@ -221,7 +221,7 @@ public class HdfsState implements State {
         }
 
         @Override
-        void doPrepare(Map conf, int partitionIndex, int numPartitions) throws IOException {
+        void doPrepare(Map<String, Object> conf, int partitionIndex, int numPartitions) throws IOException {
             LOG.info("Preparing HDFS File state...");
             this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
         }
@@ -333,7 +333,7 @@ public class HdfsState implements State {
         }
 
         @Override
-        void doPrepare(Map conf, int partitionIndex, int numPartitions) throws IOException {
+        void doPrepare(Map<String, Object> conf, int partitionIndex, int numPartitions) throws IOException {
             LOG.info("Preparing Sequence File State...");
             if (this.format == null) throw new IllegalStateException("SequenceFormat must be specified.");
 
@@ -429,7 +429,7 @@ public class HdfsState implements State {
         this.options = options;
     }
 
-    void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+    void prepare(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
         this.options.prepare(conf, partitionIndex, numPartitions);
         initLastTxn(conf, partitionIndex);
     }
@@ -476,7 +476,7 @@ public class HdfsState implements State {
         return new TxnRecord(0, options.currentFile.toString(), 0);
     }
 
-    private void initLastTxn(Map conf, int partition) {
+    private void initLastTxn(Map<String, Object> conf, int partition) {
         // include partition id in the file name so that index for different partitions are independent.
         String indexFileName = String.format(".index.%s.%d", conf.get(Config.TOPOLOGY_NAME), partition);
         this.indexFilePath = new Path(options.fileNameFormat.getPath(), indexFileName);

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsStateFactory.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsStateFactory.java
index 3f4400b..e76ec22 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsStateFactory.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsStateFactory.java
@@ -37,7 +37,7 @@ public class HdfsStateFactory implements StateFactory {
     }
 
     @Override
-    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+    public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
         LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions);
         HdfsState state = new HdfsState(this.options);
         state.prepare(conf, metrics, partitionIndex, numPartitions);

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java
index 33871f5..a952b36 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java
@@ -67,7 +67,7 @@ public class DefaultFileNameFormat implements FileNameFormat {
     }
 
     @Override
-    public void prepare(Map conf, int partitionIndex, int numPartitions) {
+    public void prepare(Map<String, Object> conf, int partitionIndex, int numPartitions) {
         this.partitionIndex = partitionIndex;
 
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java
index c246aea..c5b0698 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java
@@ -26,7 +26,7 @@ import java.util.Map;
  */
 public interface FileNameFormat extends Serializable {
 
-    void prepare(Map conf, int partitionIndex, int numPartitions);
+    void prepare(Map<String, Object> conf, int partitionIndex, int numPartitions);
 
     /**
      * Returns the filename the HdfsBolt will create.

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java
index 239c6ca..c676324 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java
@@ -53,7 +53,7 @@ public class SimpleFileNameFormat implements FileNameFormat {
 
     @SuppressWarnings("unchecked")
     @Override
-    public void prepare(Map conf, int partitionIndex, int numPartitions) {
+    public void prepare(Map<String, Object> conf, int partitionIndex, int numPartitions) {
         this.partitionIndex = partitionIndex;
         try {
             this.host = Utils.localHostname();

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
index f66c9b2..6c9cd55 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
@@ -63,7 +63,7 @@ public class BlobStoreTest {
   protected static Configuration hadoopConf = null;
   URI base;
   File baseFile;
-  private static Map conf = new HashMap();
+  private static Map<String, Object> conf = new HashMap();
   public static final int READ = 0x01;
   public static final int WRITE = 0x02;
   public static final int ADMIN = 0x04;
@@ -164,7 +164,7 @@ public class BlobStoreTest {
     } catch (IOException e) {
       LOG.error("error creating MiniDFSCluster");
     }
-    Map conf = new HashMap();
+    Map<String, Object> conf = new HashMap();
     conf.put(Config.BLOBSTORE_DIR, dirName);
     conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN,"org.apache.storm.security.auth.DefaultPrincipalToLocal");
     conf.put(Config.STORM_BLOBSTORE_REPLICATION_FACTOR, 3);

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
index c49c44b..e34ec4a 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/blobstore/HdfsBlobStoreImplTest.java
@@ -97,7 +97,7 @@ public class HdfsBlobStoreImplTest {
         String validKey = "validkeyBasic";
 
         FileSystem fs = dfscluster.getFileSystem();
-        Map conf = new HashMap();
+        Map<String, Object> conf = new HashMap();
 
         TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, conf, hadoopConf);
         // should have created blobDir
@@ -215,7 +215,7 @@ public class HdfsBlobStoreImplTest {
     @Test
     public void testGetFileLength() throws IOException {
         FileSystem fs = dfscluster.getFileSystem();
-        Map conf = new HashMap();
+        Map<String, Object> conf = new HashMap();
         String validKey = "validkeyBasic";
         String testString = "testingblob";
         TestHdfsBlobStoreImpl hbs = new TestHdfsBlobStoreImpl(blobDir, conf, hadoopConf);

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index f60cbf3..f346aef 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -121,7 +121,7 @@ public class TestHdfsSpout {
     spout.setCommitFrequencyCount(1);
     spout.setCommitFrequencySec(1);
 
-    Map conf = getCommonConfigs();
+    Map<String, Object> conf = getCommonConfigs();
     openSpout(spout, 0, conf);
 
     runSpout(spout,"r11");
@@ -143,7 +143,7 @@ public class TestHdfsSpout {
     spout.setCommitFrequencyCount(1);
     spout.setCommitFrequencySec(1);
 
-    Map conf = getCommonConfigs();
+    Map<String, Object> conf = getCommonConfigs();
     conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "1"); // enable ACKing
     openSpout(spout, 0, conf);
 
@@ -176,7 +176,7 @@ public class TestHdfsSpout {
     spout2.setCommitFrequencySec(1000);  // effectively disable commits based on time
     spout2.setLockTimeoutSec(lockExpirySec);
 
-    Map conf = getCommonConfigs();
+    Map<String, Object> conf = getCommonConfigs();
     openSpout(spout, 0, conf);
     openSpout(spout2, 1, conf);
 
@@ -233,7 +233,7 @@ public class TestHdfsSpout {
     spout2.setCommitFrequencySec(1000); // effectively disable commits based on time
     spout2.setLockTimeoutSec(lockExpirySec);
 
-    Map conf = getCommonConfigs();
+    Map<String, Object> conf = getCommonConfigs();
     openSpout(spout, 0, conf);
     openSpout(spout2, 1, conf);
 
@@ -348,7 +348,7 @@ public class TestHdfsSpout {
     spout.setCommitFrequencyCount(1);
     spout.setCommitFrequencySec(1);
 
-    Map conf = getCommonConfigs();
+    Map<String, Object> conf = getCommonConfigs();
     conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "1"); // enable ACKing
     openSpout(spout, 0, conf);
 
@@ -427,7 +427,7 @@ public class TestHdfsSpout {
 
 
     HdfsSpout spout = makeSpout(Configs.SEQ, SequenceFileReader.defaultFields);
-    Map conf = getCommonConfigs();
+    Map<String, Object> conf = getCommonConfigs();
     openSpout(spout, 0, conf);
 
     // consume both files
@@ -455,7 +455,7 @@ public class TestHdfsSpout {
 
     // 2) run spout
     HdfsSpout spout = makeSpout(MockTextFailingReader.class.getName(), MockTextFailingReader.defaultFields);
-    Map conf = getCommonConfigs();
+    Map<String, Object> conf = getCommonConfigs();
     openSpout(spout, 0, conf);
 
     List<String> res = runSpout(spout, "r11");
@@ -481,7 +481,7 @@ public class TestHdfsSpout {
      spout.setCommitFrequencySec(1000);  // effectively disable commits based on time
 
 
-     Map conf = getCommonConfigs();
+     Map<String, Object> conf = getCommonConfigs();
      openSpout(spout, 0, conf);
 
      // 1) read initial lines in file, then check if lock exists
@@ -531,7 +531,7 @@ public class TestHdfsSpout {
     spout.setCommitFrequencyCount(2);   // 1 lock log entry every 2 tuples
     spout.setCommitFrequencySec(1000);  // Effectively disable commits based on time
 
-    Map conf = getCommonConfigs();
+    Map<String, Object> conf = getCommonConfigs();
     openSpout(spout, 0, conf);
 
     // 1) read 5 lines in file,
@@ -558,7 +558,7 @@ public class TestHdfsSpout {
     spout.setCommitFrequencyCount(0); // disable it
     spout.setCommitFrequencySec(2);   // log every 2 sec
 
-    Map conf = getCommonConfigs();
+    Map<String, Object> conf = getCommonConfigs();
     openSpout(spout, 0, conf);
 
     // 1) read 5 lines in file
@@ -589,7 +589,7 @@ public class TestHdfsSpout {
 
 
   private Map getCommonConfigs() {
-    Map conf = new HashMap();
+    Map<String, Object> conf = new HashMap();
     conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "0");
     return conf;
   }
@@ -605,7 +605,7 @@ public class TestHdfsSpout {
     return spout;
   }
 
-  private void openSpout(HdfsSpout spout, int spoutId, Map conf) {
+  private void openSpout(HdfsSpout spout, int spoutId, Map<String, Object> conf) {
     MockCollector collector = new MockCollector();
     spout.open(conf, new MockTopologyContext(spoutId), collector);
   }
@@ -725,7 +725,7 @@ public class TestHdfsSpout {
     public static final String[] defaultFields = {"line"};
     int readAttempts = 0;
 
-    public MockTextFailingReader(FileSystem fs, Path file, Map conf) throws IOException {
+    public MockTextFailingReader(FileSystem fs, Path file, Map<String, Object> conf) throws IOException {
       super(fs, file, conf);
     }
 
@@ -745,7 +745,7 @@ public class TestHdfsSpout {
     private final int componentId;
 
     public MockTopologyContext(int componentId) {
-      // StormTopology topology, Map stormConf, Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks, Map<String, Map<String, Fields>> componentToStreamToFields, String stormId, String codeDir, String pidDir, Integer taskId, Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources, Map<String, Object> userResources, Map<String, Object> executorData, Map<Integer, Map<Integer, Map<String, IMetric>>> registeredMetrics, Atom openOrPrepareWasCalled
+      // StormTopology topology, Map<String, Object> topoConf, Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks, Map<String, Map<String, Fields>> componentToStreamToFields, String stormId, String codeDir, String pidDir, Integer taskId, Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources, Map<String, Object> userResources, Map<String, Object> executorData, Map<Integer, Map<Integer, Map<String, IMetric>>> registeredMetrics, Atom openOrPrepareWasCalled
       super(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
       this.componentId = componentId;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java
index 4480441..e018016 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/trident/HdfsStateTest.java
@@ -60,7 +60,7 @@ public class HdfsStateTest {
         private String currentFileName = "";
 
         @Override
-        public void prepare(Map conf, int partitionIndex, int numPartitions) {
+        public void prepare(Map<String, Object> conf, int partitionIndex, int numPartitions) {
 
         }
 
@@ -94,7 +94,7 @@ public class HdfsStateTest {
                 .withRotationPolicy(rotationPolicy)
                 .withFsUrl("file://" + TEST_OUT_DIR);
 
-        Map<String, String> conf = new HashMap<>();
+        Map<String, Object> conf = new HashMap<>();
         conf.put(Config.TOPOLOGY_NAME, TEST_TOPOLOGY_NAME);
 
         HdfsState state = new HdfsState(options);

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
index 0215b32..d1019e7 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
@@ -68,7 +68,7 @@ public class HiveBolt extends BaseRichBolt {
     }
 
     @Override
-    public void prepare(Map conf, TopologyContext topologyContext, OutputCollector collector)  {
+    public void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector)  {
         try {
             tokenAuthEnabled = HiveUtils.isTokenAuthEnabled(conf);
             try {

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
index 4222640..26beee0 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
@@ -103,7 +103,7 @@ public class HiveUtils {
         }
     }
 
-    public static boolean isTokenAuthEnabled(Map conf) {
+    public static boolean isTokenAuthEnabled(Map<String, Object> conf) {
         return conf.get(TOPOLOGY_AUTO_CREDENTIALS) != null && (((List) conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHive.class.getName()));
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
index 10b3591..a7685f0 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
@@ -73,7 +73,7 @@ public class HiveState implements State {
     public void commit(Long txId) {
     }
 
-    public void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions)  {
+    public void prepare(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions)  {
         try {
             tokenAuthEnabled = HiveUtils.isTokenAuthEnabled(conf);
             try {

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
index d6e3c71..6659825 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
@@ -40,7 +40,7 @@ public class HiveStateFactory implements StateFactory {
     }
 
     @Override
-    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+    public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
         LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions);
         HiveState state = new HiveState(this.options);
         state.prepare(conf, metrics, partitionIndex, numPartitions);

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java b/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
index d691e75..9b3b614 100644
--- a/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
@@ -196,7 +196,7 @@ public class JmsBolt extends BaseTickTupleAwareRichBolt {
      * Initializes JMS resources.
      */
     @Override
-    public void prepare(Map stormConf, TopologyContext context,
+    public void prepare(Map<String, Object> topoConf, TopologyContext context,
                         OutputCollector collector) {
         if (this.jmsProvider == null || this.producer == null) {
             throw new IllegalStateException("JMS Provider and MessageProducer not set.");

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
index e69ff3a..042e643 100644
--- a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
@@ -164,7 +164,7 @@ public class JmsSpout extends BaseRichSpout implements MessageListener {
      * topic/queue.
      */
     @SuppressWarnings("rawtypes")
-    public void open(Map conf, TopologyContext context,
+    public void open(Map<String, Object> conf, TopologyContext context,
                      SpoutOutputCollector collector) {
         if (this.jmsProvider == null) {
             throw new IllegalStateException("JMS provider has not been set.");

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java
index 96e00ad..31f4d0d 100644
--- a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java
@@ -163,12 +163,12 @@ public class TridentJmsSpout implements ITridentSpout<JmsBatch> {
     
     @Override
     public ITridentSpout.BatchCoordinator<JmsBatch> getCoordinator(
-            String txStateId, @SuppressWarnings("rawtypes") Map conf, TopologyContext context) {
+            String txStateId, @SuppressWarnings("rawtypes") Map<String, Object> conf, TopologyContext context) {
         return new JmsBatchCoordinator(name);
     }
 
     @Override
-    public Emitter<JmsBatch> getEmitter(String txStateId, @SuppressWarnings("rawtypes") Map conf, TopologyContext context) {
+    public Emitter<JmsBatch> getEmitter(String txStateId, @SuppressWarnings("rawtypes") Map<String, Object> conf, TopologyContext context) {
         return new JmsEmitter(name, jmsProvider, tupleProducer, jmsAcknowledgeMode, conf);
     }
 
@@ -210,7 +210,7 @@ public class TridentJmsSpout implements ITridentSpout<JmsBatch> {
        
         private final Logger LOG = LoggerFactory.getLogger(JmsEmitter.class);
  
-        public JmsEmitter(String name, JmsProvider jmsProvider, JmsTupleProducer tupleProducer, int jmsAcknowledgeMode, @SuppressWarnings("rawtypes") Map conf) {
+        public JmsEmitter(String name, JmsProvider jmsProvider, JmsTupleProducer tupleProducer, int jmsAcknowledgeMode, @SuppressWarnings("rawtypes") Map<String, Object> conf) {
             if (jmsProvider == null) {
                 throw new IllegalStateException("JMS provider has not been set.");
             }
@@ -406,4 +406,4 @@ public class TridentJmsSpout implements ITridentSpout<JmsBatch> {
 
 }
 
-    
\ No newline at end of file
+    

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
index 30f97a0..1a401b2 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
@@ -100,7 +100,7 @@ public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt {
     }
 
     @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
         LOG.info("Preparing bolt with configuration {}", this);
         //for backward compatibility.
         if (mapper == null) {
@@ -110,10 +110,10 @@ public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt {
 
         //for backward compatibility.
         if (topicSelector == null) {
-            if (stormConf.containsKey(TOPIC)) {
+            if (topoConf.containsKey(TOPIC)) {
                 LOG.info("TopicSelector not specified. Using [{}] for topic [{}] specified in bolt configuration,",
-                        DefaultTopicSelector.class.getSimpleName(), stormConf.get(TOPIC));
-                this.topicSelector = new DefaultTopicSelector((String) stormConf.get(TOPIC));
+                        DefaultTopicSelector.class.getSimpleName(), topoConf.get(TOPIC));
+                this.topicSelector = new DefaultTopicSelector((String) topoConf.get(TOPIC));
             } else {
                 throw new IllegalStateException("topic should be specified in bolt's configuration");
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 310902e..bb76535 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -96,7 +96,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     }
 
     @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
         initialized = false;
         this.context = context;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java
index df3e800..079cadb 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java
@@ -75,4 +75,4 @@ public class ManualPartitionNamedSubscription extends NamedSubscription {
             consumer.assign(currentAssignment);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java
index cf4dfcb..5f86605 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java
@@ -73,4 +73,4 @@ public class ManualPartitionPatternSubscription extends PatternSubscription {
             consumer.assign(currentAssignment);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
index 2a2e1cb..00c9d39 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
@@ -72,4 +72,4 @@ public class Timer {
         }
         return expired;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
index 0f7f0af..b8393ca 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
@@ -46,12 +46,12 @@ public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSp
     }
 
     @Override
-    public Emitter<List<TopicPartition>, KafkaTridentSpoutTopicPartition, KafkaTridentSpoutBatchMetadata<K,V>> getEmitter(Map conf, TopologyContext context) {
+    public Emitter<List<TopicPartition>, KafkaTridentSpoutTopicPartition, KafkaTridentSpoutBatchMetadata<K,V>> getEmitter(Map<String, Object> conf, TopologyContext context) {
         return new KafkaTridentSpoutEmitter<>(kafkaManager, context);
     }
 
     @Override
-    public Coordinator<List<TopicPartition>> getCoordinator(Map conf, TopologyContext context) {
+    public Coordinator<List<TopicPartition>> getCoordinator(Map<String, Object> conf, TopologyContext context) {
         return new KafkaTridentSpoutOpaqueCoordinator<>(kafkaManager);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
index 49e3b07..8b05890 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
@@ -28,12 +28,12 @@ import java.util.Map;
 // TODO
 public class KafkaTridentSpoutTransactional<Ps, P extends ISpoutPartition, T> implements IPartitionedTridentSpout<Ps, P, T> {
     @Override
-    public Coordinator<Ps> getCoordinator(Map conf, TopologyContext context) {
+    public Coordinator<Ps> getCoordinator(Map<String, Object> conf, TopologyContext context) {
         return null;
     }
 
     @Override
-    public Emitter<Ps, P, T> getEmitter(Map conf, TopologyContext context) {
+    public Emitter<Ps, P, T> getEmitter(Map<String, Object> conf, TopologyContext context) {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
index f564510..0bf21ab 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
@@ -52,7 +52,7 @@ public class TridentKafkaStateFactory implements StateFactory {
     }
 
     @Override
-    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+    public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
         LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions);
         TridentKafkaState state = new TridentKafkaState()
                 .withKafkaTopicSelector(this.topicSelector)

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
index 93b1040..004d62c 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
@@ -111,4 +111,4 @@ public class KafkaUnit {
     private void closeProducer() {
         producer.close();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java
index 6e90c9d..c755f41 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java
@@ -43,4 +43,4 @@ public class KafkaUnitRule extends ExternalResource {
     public KafkaUnit getKafkaUnit() {
         return this.kafkaUnit;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java
index 7a94a50..8d183ee 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java
@@ -33,7 +33,7 @@ public class KafkaSpoutTestBolt extends BaseRichBolt {
     private OutputCollector collector;
 
     @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
         this.collector = collector;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java
index 13e676a..c203359 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java
@@ -43,7 +43,7 @@ public class DynamicBrokersReader {
     private String _topic;
     private Boolean _isWildcardTopic;
 
-    public DynamicBrokersReader(Map conf, String zkStr, String zkPath, String topic) {
+    public DynamicBrokersReader(Map<String, Object> conf, String zkStr, String zkPath, String topic) {
         // Check required parameters
         Preconditions.checkNotNull(conf, "conf cannot be null");
 
@@ -200,7 +200,7 @@ public class DynamicBrokersReader {
      * Validate required parameters in the input configuration Map
      * @param conf
      */
-    private void validateConfig(final Map conf) {
+    private void validateConfig(final Map<String, Object> conf) {
         Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT),
                 "%s cannot be null", Config.STORM_ZOOKEEPER_SESSION_TIMEOUT);
         Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT),

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
index 90cf440..60654a5 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
@@ -40,7 +40,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
 
     }
 
-    public void prepare(SpoutConfig spoutConfig, Map stormConf) {
+    public void prepare(SpoutConfig spoutConfig, Map<String, Object> topoConf) {
         this.retryInitialDelayMs = spoutConfig.retryInitialDelayMs;
         this.retryDelayMultiplier = spoutConfig.retryDelayMultiplier;
         this.retryDelayMaxMs = spoutConfig.retryDelayMaxMs;

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
index c1fb96e..c7a7a04 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
@@ -26,7 +26,7 @@ public interface FailedMsgRetryManager extends Serializable {
     /**
      * Initialization
      */
-    void prepare(SpoutConfig spoutConfig, Map stormConf);
+    void prepare(SpoutConfig spoutConfig, Map<String, Object> topoConf);
 
     /**
      * Message corresponding to the offset failed in kafka spout.

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
index 01cc9b7..ead6057 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
@@ -58,7 +58,7 @@ public class KafkaSpout extends BaseRichSpout {
     }
 
     @Override
-    public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+    public void open(Map<String, Object> conf, final TopologyContext context, final SpoutOutputCollector collector) {
         _collector = collector;
         String topologyInstanceId = context.getStormId();
         Map<String, Object> stateConf = new HashMap<>(conf);

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
index 2072df3..604f1f3 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
@@ -61,11 +61,11 @@ public class KafkaUtils {
         throw new AssertionError();
     }
 
-    public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) {
+    public static IBrokerReader makeBrokerReader(Map<String, Object> topoConf, KafkaConfig conf) {
         if (conf.hosts instanceof StaticHosts) {
             return new StaticBrokerReader(conf.topic, ((StaticHosts) conf.hosts).getPartitionInformation());
         } else {
-            return new ZkBrokerReader(stormConf, conf.topic, (ZkHosts) conf.hosts);
+            return new ZkBrokerReader(topoConf, conf.topic, (ZkHosts) conf.hosts);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index f761d21..ad71bbe 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -64,18 +64,18 @@ public class PartitionManager {
     SimpleConsumer _consumer;
     DynamicPartitionConnections _connections;
     ZkState _state;
-    Map _stormConf;
+    Map _topoConf;
     long numberFailed, numberAcked;
 
     public PartitionManager(
             DynamicPartitionConnections connections,
             String topologyInstanceId,
             ZkState state,
-            Map stormConf,
+            Map<String, Object> topoConf,
             SpoutConfig spoutConfig,
             Partition id)
     {
-        this(connections, topologyInstanceId, state, stormConf, spoutConfig, id, null);
+        this(connections, topologyInstanceId, state, topoConf, spoutConfig, id, null);
     }
 
     /**
@@ -85,7 +85,7 @@ public class PartitionManager {
             DynamicPartitionConnections connections,
             String topologyInstanceId,
             ZkState state,
-            Map stormConf,
+            Map<String, Object> topoConf,
             SpoutConfig spoutConfig,
             Partition id,
             PartitionManager previousManager) {
@@ -95,7 +95,7 @@ public class PartitionManager {
         _topologyInstanceId = topologyInstanceId;
         _consumer = connections.register(id.host, id.topic, id.partition);
         _state = state;
-        _stormConf = stormConf;
+        _topoConf = topoConf;
         numberAcked = numberFailed = 0;
 
         if (previousManager != null) {
@@ -110,7 +110,7 @@ public class PartitionManager {
         } else {
             try {
                 _failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance();
-                _failedMsgRetryManager.prepare(spoutConfig, _stormConf);
+                _failedMsgRetryManager.prepare(spoutConfig, _topoConf);
             } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
                 throw new IllegalArgumentException(String.format("Failed to create an instance of <%s> from: <%s>",
                         FailedMsgRetryManager.class,
@@ -336,7 +336,7 @@ public class PartitionManager {
             LOG.debug("Writing last completed offset ({}) to ZK for {} for topology: {}", lastCompletedOffset, _partition, _topologyInstanceId);
             Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
                     .put("topology", ImmutableMap.of("id", _topologyInstanceId,
-                            "name", _stormConf.get(Config.TOPOLOGY_NAME)))
+                            "name", _topoConf.get(Config.TOPOLOGY_NAME)))
                     .put("offset", lastCompletedOffset)
                     .put("partition", _partition.partition)
                     .put("broker", ImmutableMap.of("host", _partition.host.host,

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
index 628bfc0..46cb7d9 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
@@ -26,13 +26,13 @@ public class StaticCoordinator implements PartitionCoordinator {
     Map<Partition, PartitionManager> _managers = new HashMap<Partition, PartitionManager>();
     List<PartitionManager> _allManagers = new ArrayList<>();
 
-    public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
+    public StaticCoordinator(DynamicPartitionConnections connections, Map<String, Object> topoConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
         StaticHosts hosts = (StaticHosts) config.hosts;
         List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();
         partitions.add(hosts.getPartitionInformation());
         List<Partition> myPartitions = KafkaUtils.calculatePartitionsForTask(partitions, totalTasks, taskIndex);
         for (Partition myPartition : myPartitions) {
-            _managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, stormConf, config, myPartition));
+            _managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, topoConf, config, myPartition));
         }
         _allManagers = new ArrayList<>(_managers.values());
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
index 14be584..e814157 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
@@ -39,28 +39,28 @@ public class ZkCoordinator implements PartitionCoordinator {
     DynamicPartitionConnections _connections;
     DynamicBrokersReader _reader;
     ZkState _state;
-    Map _stormConf;
+    Map _topoConf;
 
-    public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
-        this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, buildReader(stormConf, spoutConfig));
+    public ZkCoordinator(DynamicPartitionConnections connections, Map<String, Object> topoConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
+        this(connections, topoConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, buildReader(topoConf, spoutConfig));
     }
 
-    public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) {
+    public ZkCoordinator(DynamicPartitionConnections connections, Map<String, Object> topoConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) {
         _spoutConfig = spoutConfig;
         _connections = connections;
         _taskIndex = taskIndex;
         _totalTasks = totalTasks;
         _topologyInstanceId = topologyInstanceId;
-        _stormConf = stormConf;
+        _topoConf = topoConf;
         _state = state;
         ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts;
         _refreshFreqMs = brokerConf.refreshFreqSecs * 1000;
         _reader = reader;
     }
 
-    private static DynamicBrokersReader buildReader(Map stormConf, SpoutConfig spoutConfig) {
+    private static DynamicBrokersReader buildReader(Map<String, Object> topoConf, SpoutConfig spoutConfig) {
         ZkHosts hosts = (ZkHosts) spoutConfig.hosts;
-        return new DynamicBrokersReader(stormConf, hosts.brokerZkStr, hosts.brokerZkPath, spoutConfig.topic);
+        return new DynamicBrokersReader(topoConf, hosts.brokerZkStr, hosts.brokerZkPath, spoutConfig.topic);
     }
 
     @Override
@@ -102,7 +102,7 @@ public class ZkCoordinator implements PartitionCoordinator {
                         _connections,
                         _topologyInstanceId,
                         _state,
-                        _stormConf,
+                        _topoConf,
                         _spoutConfig,
                         id,
                         deletedManagers.get(id.partition));

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
index 0fff78a..5cb6129 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
@@ -93,7 +93,7 @@ public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt {
     }
 
     @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
         //for backward compatibility.
         if(mapper == null) {
             this.mapper = new FieldNameBasedTupleToKafkaMapper<K,V>();
@@ -101,8 +101,8 @@ public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt {
 
         //for backward compatibility.
         if(topicSelector == null) {
-            if(stormConf.containsKey(TOPIC)) {
-                this.topicSelector = new DefaultTopicSelector((String) stormConf.get(TOPIC));
+            if(topoConf.containsKey(TOPIC)) {
+                this.topicSelector = new DefaultTopicSelector((String) topoConf.get(TOPIC));
             } else {
                 throw new IllegalArgumentException("topic should be specified in bolt's configuration");
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java
index baec8cb..76baf62 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java
@@ -29,7 +29,7 @@ class Coordinator implements IPartitionedTridentSpout.Coordinator<List<GlobalPar
     private IBrokerReader reader;
     private TridentKafkaConfig config;
 
-    public Coordinator(Map conf, TridentKafkaConfig tridentKafkaConfig) {
+    public Coordinator(Map<String, Object> conf, TridentKafkaConfig tridentKafkaConfig) {
         config = tridentKafkaConfig;
         reader = KafkaUtils.makeBrokerReader(conf, config);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
index 71324f7..8a47ddc 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
@@ -36,13 +36,13 @@ public class OpaqueTridentKafkaSpout implements IOpaquePartitionedTridentSpout<L
     }
 
     @Override
-    public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map> getEmitter(Map conf, TopologyContext context) {
+    public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map> getEmitter(Map<String, Object> conf, TopologyContext context) {
         return new TridentKafkaEmitter(conf, context, _config, context
                 .getStormId()).asOpaqueEmitter();
     }
 
     @Override
-    public IOpaquePartitionedTridentSpout.Coordinator getCoordinator(Map conf, TopologyContext tc) {
+    public IOpaquePartitionedTridentSpout.Coordinator getCoordinator(Map<String, Object> conf, TopologyContext tc) {
         return new org.apache.storm.kafka.trident.Coordinator(conf, _config);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java
index 7ce8d52..1042098 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java
@@ -35,12 +35,12 @@ public class TransactionalTridentKafkaSpout implements IPartitionedTridentSpout<
 
 
     @Override
-    public IPartitionedTridentSpout.Coordinator getCoordinator(Map conf, TopologyContext context) {
+    public IPartitionedTridentSpout.Coordinator getCoordinator(Map<String, Object> conf, TopologyContext context) {
         return new org.apache.storm.kafka.trident.Coordinator(conf, _config);
     }
 
     @Override
-    public IPartitionedTridentSpout.Emitter getEmitter(Map conf, TopologyContext context) {
+    public IPartitionedTridentSpout.Emitter getEmitter(Map<String, Object> conf, TopologyContext context) {
         return new TridentKafkaEmitter(conf, context, _config, context.getStormId()).asTransactionalEmitter();
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
index 176a878..1339387 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
@@ -61,7 +61,7 @@ public class TridentKafkaEmitter {
     private TridentKafkaConfig _config;
     private String _topologyInstanceId;
 
-    public TridentKafkaEmitter(Map conf, TopologyContext context, TridentKafkaConfig config, String topologyInstanceId) {
+    public TridentKafkaEmitter(Map<String, Object> conf, TopologyContext context, TridentKafkaConfig config, String topologyInstanceId) {
         _config = config;
         _topologyInstanceId = topologyInstanceId;
         _connections = new DynamicPartitionConnections(_config, KafkaUtils.makeBrokerReader(conf, _config));

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
index f564510..0bf21ab 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
@@ -52,7 +52,7 @@ public class TridentKafkaStateFactory implements StateFactory {
     }
 
     @Override
-    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+    public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
         LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions);
         TridentKafkaState state = new TridentKafkaState()
                 .withKafkaTopicSelector(this.topicSelector)

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java
index abc10db..00758a6 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java
@@ -38,7 +38,7 @@ public class ZkBrokerReader implements IBrokerReader {
 
 	long refreshMillis;
 
-	public ZkBrokerReader(Map conf, String topic, ZkHosts hosts) {
+	public ZkBrokerReader(Map<String, Object> conf, String topic, ZkHosts hosts) {
 		try {
 			reader = new DynamicBrokersReader(conf, hosts.brokerZkStr, hosts.brokerZkPath, topic);
 			cachedBrokers = reader.getBrokerInfo();

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
index b23d5bc..9778d15 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
@@ -44,7 +44,7 @@ public class ZkCoordinatorTest {
 
     private KafkaTestBroker broker = new KafkaTestBroker();
     private TestingServer server;
-    private Map stormConf = new HashMap();
+    private Map<String, Object> topoConf = new HashMap();
     private SpoutConfig spoutConfig;
     private ZkState state;
     private SimpleConsumer simpleConsumer;
@@ -57,14 +57,14 @@ public class ZkCoordinatorTest {
         ZkHosts hosts = new ZkHosts(connectionString);
         hosts.refreshFreqSecs = 1;
         spoutConfig = new SpoutConfig(hosts, "topic", "/test", "id");
-        Map conf = buildZookeeperConfig(server);
+        Map<String, Object> conf = buildZookeeperConfig(server);
         state = new ZkState(conf);
         simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 60000, 1024, "testClient");
         when(dynamicPartitionConnections.register(any(Broker.class), any(String.class) ,anyInt())).thenReturn(simpleConsumer);
     }
 
     private Map buildZookeeperConfig(TestingServer server) {
-        Map conf = new HashMap();
+        Map<String, Object> conf = new HashMap();
         conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, server.getPort());
         conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList("localhost"));
         conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 20000);
@@ -174,7 +174,7 @@ public class ZkCoordinatorTest {
     private List<ZkCoordinator> buildCoordinators(int totalTasks) {
         List<ZkCoordinator> coordinatorList = new ArrayList<ZkCoordinator>();
         for (int i = 0; i < totalTasks; i++) {
-            ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, stormConf, spoutConfig, state, i, totalTasks, "test-id", reader);
+            ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, topoConf, spoutConfig, state, i, totalTasks, "test-id", reader);
             coordinatorList.add(coordinator);
         }
         return coordinatorList;

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
index 500195b..da08a50 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
@@ -46,7 +46,7 @@ public class KinesisSpout extends BaseRichSpout {
     }
 
     @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
         this.collector = collector;
         kinesisRecordsManager = new KinesisRecordsManager(kinesisConfig);
         kinesisRecordsManager.initialize(context.getThisTaskIndex(), context.getComponentTasks(context.getThisComponentId()).size());

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java
index a2adb9d..7701efd 100644
--- a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java
+++ b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java
@@ -32,7 +32,7 @@ public class KinesisBoltTest extends BaseRichBolt {
     private transient OutputCollector collector;
 
     @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
         this.collector = collector;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/64e29f36/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
index f730ec7..89d249e 100644
--- a/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
+++ b/external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/AbstractMongoBolt.java
@@ -42,7 +42,7 @@ public abstract class AbstractMongoBolt extends BaseRichBolt {
     }
 
     @Override
-    public void prepare(Map stormConf, TopologyContext context,
+    public void prepare(Map<String, Object> topoConf, TopologyContext context,
             OutputCollector collector) {
         this.collector = collector;
         this.mongoClient = new MongoDBClient(url, collectionName);