You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:05:04 UTC

[26/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/Time.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/Time.java b/jstorm-core/src/main/java/backtype/storm/utils/Time.java
index 50a79fd..8732008 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/Time.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/Time.java
@@ -24,86 +24,87 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class Time {
-    public static Logger LOG = LoggerFactory.getLogger(Time.class);    
-    
+    public static Logger LOG = LoggerFactory.getLogger(Time.class);
+
     private static AtomicBoolean simulating = new AtomicBoolean(false);
-    //TODO: should probably use weak references here or something
+    // TODO: should probably use weak references here or something
     private static volatile Map<Thread, AtomicLong> threadSleepTimes;
     private static final Object sleepTimesLock = new Object();
-    
-    private static AtomicLong simulatedCurrTimeMs; //should this be a thread local that's allowed to keep advancing?
-    
+
+    private static AtomicLong simulatedCurrTimeMs; // should this be a thread local that's allowed to keep advancing?
+
     public static void startSimulating() {
-        synchronized(sleepTimesLock) {
+        synchronized (sleepTimesLock) {
             simulating.set(true);
             simulatedCurrTimeMs = new AtomicLong(0);
             threadSleepTimes = new ConcurrentHashMap<Thread, AtomicLong>();
         }
     }
-    
+
     public static void stopSimulating() {
-        synchronized(sleepTimesLock) {
-            simulating.set(false);             
-            threadSleepTimes = null;  
+        synchronized (sleepTimesLock) {
+            simulating.set(false);
+            threadSleepTimes = null;
         }
     }
-    
+
     public static boolean isSimulating() {
         return simulating.get();
     }
-    
+
     public static void sleepUntil(long targetTimeMs) throws InterruptedException {
-        if(simulating.get()) {
+        if (simulating.get()) {
             try {
-                synchronized(sleepTimesLock) {
+                synchronized (sleepTimesLock) {
                     threadSleepTimes.put(Thread.currentThread(), new AtomicLong(targetTimeMs));
                 }
-                while(simulatedCurrTimeMs.get() < targetTimeMs) {
+                while (simulatedCurrTimeMs.get() < targetTimeMs) {
                     Thread.sleep(10);
                 }
             } finally {
-                synchronized(sleepTimesLock) {
+                synchronized (sleepTimesLock) {
                     if (simulating.get()) {
                         threadSleepTimes.remove(Thread.currentThread());
                     }
                 }
             }
         } else {
-            long sleepTime = targetTimeMs-currentTimeMillis();
-            if(sleepTime>0) 
+            long sleepTime = targetTimeMs - currentTimeMillis();
+            if (sleepTime > 0)
                 Thread.sleep(sleepTime);
         }
     }
-    
+
     public static void sleep(long ms) throws InterruptedException {
-        sleepUntil(currentTimeMillis()+ms);
+        sleepUntil(currentTimeMillis() + ms);
     }
-    
+
     public static long currentTimeMillis() {
-        if(simulating.get()) {
+        if (simulating.get()) {
             return simulatedCurrTimeMs.get();
         } else {
             return System.currentTimeMillis();
         }
     }
-    
+
     public static int currentTimeSecs() {
         return (int) (currentTimeMillis() / 1000);
     }
-    
+
     public static void advanceTime(long ms) {
-        if(!simulating.get()) throw new IllegalStateException("Cannot simulate time unless in simulation mode");
+        if (!simulating.get())
+            throw new IllegalStateException("Cannot simulate time unless in simulation mode");
         simulatedCurrTimeMs.set(simulatedCurrTimeMs.get() + ms);
     }
-    
+
     public static boolean isThreadWaiting(Thread t) {
-        if(!simulating.get()) throw new IllegalStateException("Must be in simulation mode");
+        if (!simulating.get())
+            throw new IllegalStateException("Must be in simulation mode");
         AtomicLong time;
-        synchronized(sleepTimesLock) {
+        synchronized (sleepTimesLock) {
             time = threadSleepTimes.get(t);
         }
-        return !t.isAlive() || time!=null && currentTimeMillis() < time.longValue();
-    }    
+        return !t.isAlive() || time != null && currentTimeMillis() < time.longValue();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/TimeCacheMap.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/TimeCacheMap.java b/jstorm-core/src/main/java/backtype/storm/utils/TimeCacheMap.java
index f0a194f..a29a954 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/TimeCacheMap.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/TimeCacheMap.java
@@ -37,18 +37,18 @@ import java.util.Set;
 public class TimeCacheMap<K, V> {
     // this default ensures things expire at most 50% past the expiration time
     private static final int DEFAULT_NUM_BUCKETS = 3;
-    
+
     @Deprecated
     public static interface ExpiredCallback<K, V> {
         public void expire(K key, V val);
     }
-    
+
     private LinkedList<HashMap<K, V>> _buckets;
-    
+
     private final Object _lock = new Object();
     private Thread _cleaner;
     private ExpiredCallback _callback;
-    
+
     public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) {
         if (numBuckets < 2) {
             throw new IllegalArgumentException("numBuckets must be >= 2");
@@ -57,7 +57,7 @@ public class TimeCacheMap<K, V> {
         for (int i = 0; i < numBuckets; i++) {
             _buckets.add(new HashMap<K, V>());
         }
-        
+
         _callback = callback;
         final long expirationMillis = expirationSecs * 1000L;
         final long sleepTime = expirationMillis / (numBuckets - 1);
@@ -78,26 +78,26 @@ public class TimeCacheMap<K, V> {
                         }
                     }
                 } catch (InterruptedException ex) {
-                    
+
                 }
             }
         });
         _cleaner.setDaemon(true);
         _cleaner.start();
     }
-    
+
     public TimeCacheMap(int expirationSecs, ExpiredCallback<K, V> callback) {
         this(expirationSecs, DEFAULT_NUM_BUCKETS, callback);
     }
-    
+
     public TimeCacheMap(int expirationSecs) {
         this(expirationSecs, DEFAULT_NUM_BUCKETS);
     }
-    
+
     public TimeCacheMap(int expirationSecs, int numBuckets) {
         this(expirationSecs, numBuckets, null);
     }
-    
+
     public boolean containsKey(K key) {
         synchronized (_lock) {
             for (HashMap<K, V> bucket : _buckets) {
@@ -108,7 +108,7 @@ public class TimeCacheMap<K, V> {
             return false;
         }
     }
-    
+
     public V get(K key) {
         synchronized (_lock) {
             for (HashMap<K, V> bucket : _buckets) {
@@ -119,7 +119,7 @@ public class TimeCacheMap<K, V> {
             return null;
         }
     }
-    
+
     public void put(K key, V value) {
         synchronized (_lock) {
             Iterator<HashMap<K, V>> it = _buckets.iterator();
@@ -131,7 +131,7 @@ public class TimeCacheMap<K, V> {
             }
         }
     }
-    
+
     public Object remove(K key) {
         synchronized (_lock) {
             for (HashMap<K, V> bucket : _buckets) {
@@ -142,7 +142,7 @@ public class TimeCacheMap<K, V> {
             return null;
         }
     }
-    
+
     public int size() {
         synchronized (_lock) {
             int size = 0;
@@ -152,11 +152,11 @@ public class TimeCacheMap<K, V> {
             return size;
         }
     }
-    
+
     public void cleanup() {
         _cleaner.interrupt();
     }
-    
+
     public Set<K> keySet() {
         Set<K> ret = new HashSet<K>();
         synchronized (_lock) {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/TransferDrainer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/TransferDrainer.java b/jstorm-core/src/main/java/backtype/storm/utils/TransferDrainer.java
index 4638117..48b39b7 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/TransferDrainer.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/TransferDrainer.java
@@ -26,88 +26,88 @@ import backtype.storm.messaging.TaskMessage;
 
 public class TransferDrainer {
 
-  private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
-  
-  public void add(HashMap<String, ArrayList<TaskMessage>> workerTupleSetMap) {
-    for (String key : workerTupleSetMap.keySet()) {
-      
-      ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key);
-      if (null == bundle) {
-        bundle = new ArrayList<ArrayList<TaskMessage>>();
-        bundles.put(key, bundle);
-      }
-      
-      ArrayList tupleSet = workerTupleSetMap.get(key);
-      if (null != tupleSet && tupleSet.size() > 0) {
-        bundle.add(tupleSet);
-      }
-    } 
-  }
-  
-  public void send(HashMap<String, IConnection> connections) {
-    for (String hostPort : bundles.keySet()) {
-      IConnection connection = connections.get(hostPort);
-      if (null != connection) { 
-        ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(hostPort);
-        for (ArrayList<TaskMessage> list : bundle) {
-            connection.send(list);
+    private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
+
+    public void add(HashMap<String, ArrayList<TaskMessage>> workerTupleSetMap) {
+        for (String key : workerTupleSetMap.keySet()) {
+
+            ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key);
+            if (null == bundle) {
+                bundle = new ArrayList<ArrayList<TaskMessage>>();
+                bundles.put(key, bundle);
+            }
+
+            ArrayList tupleSet = workerTupleSetMap.get(key);
+            if (null != tupleSet && tupleSet.size() > 0) {
+                bundle.add(tupleSet);
+            }
         }
-        
-      }
-    } 
-  }
-  
-  private Iterator<TaskMessage> getBundleIterator(final ArrayList<ArrayList<TaskMessage>> bundle) {
-    
-    if (null == bundle) {
-      return null;
     }
-    
-    return new Iterator<TaskMessage> () {
-      
-      private int offset = 0;
-      private int size = 0;
-      {
-        for (ArrayList<TaskMessage> list : bundle) {
-            size += list.size();
-        }
-      }
-      
-      private int bundleOffset = 0;
-      private Iterator<TaskMessage> iter = bundle.get(bundleOffset).iterator();
-      
-      @Override
-      public boolean hasNext() {
-        if (offset < size) {
-          return true;
-        }
-        return false;
-      }
-
-      @Override
-      public TaskMessage next() {
-        TaskMessage msg = null;
-        if (iter.hasNext()) {
-          msg = iter.next(); 
-        } else {
-          bundleOffset++;
-          iter = bundle.get(bundleOffset).iterator();
-          msg = iter.next();
+
+    public void send(HashMap<String, IConnection> connections) {
+        for (String hostPort : bundles.keySet()) {
+            IConnection connection = connections.get(hostPort);
+            if (null != connection) {
+                ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(hostPort);
+                for (ArrayList<TaskMessage> list : bundle) {
+                    connection.send(list);
+                }
+
+            }
         }
-        if (null != msg) {
-          offset++;
+    }
+
+    private Iterator<TaskMessage> getBundleIterator(final ArrayList<ArrayList<TaskMessage>> bundle) {
+
+        if (null == bundle) {
+            return null;
         }
-        return msg;
-      }
-
-      @Override
-      public void remove() {
-        throw new RuntimeException("not supported");
-      }
-    };
-  }
-  
-  public void clear() {
-    bundles.clear();
-  }
+
+        return new Iterator<TaskMessage>() {
+
+            private int offset = 0;
+            private int size = 0;
+            {
+                for (ArrayList<TaskMessage> list : bundle) {
+                    size += list.size();
+                }
+            }
+
+            private int bundleOffset = 0;
+            private Iterator<TaskMessage> iter = bundle.get(bundleOffset).iterator();
+
+            @Override
+            public boolean hasNext() {
+                if (offset < size) {
+                    return true;
+                }
+                return false;
+            }
+
+            @Override
+            public TaskMessage next() {
+                TaskMessage msg = null;
+                if (iter.hasNext()) {
+                    msg = iter.next();
+                } else {
+                    bundleOffset++;
+                    iter = bundle.get(bundleOffset).iterator();
+                    msg = iter.next();
+                }
+                if (null != msg) {
+                    offset++;
+                }
+                return msg;
+            }
+
+            @Override
+            public void remove() {
+                throw new RuntimeException("not supported");
+            }
+        };
+    }
+
+    public void clear() {
+        bundles.clear();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/TupleHelpers.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/TupleHelpers.java b/jstorm-core/src/main/java/backtype/storm/utils/TupleHelpers.java
index 45725c9..ce2a0b3 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/TupleHelpers.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/TupleHelpers.java
@@ -22,9 +22,9 @@ import backtype.storm.tuple.Tuple;
 
 public class TupleHelpers {
     private TupleHelpers() {
-        
+
     }
-    
+
     public static boolean isTickTuple(Tuple tuple) {
         return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/TupleUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/TupleUtils.java b/jstorm-core/src/main/java/backtype/storm/utils/TupleUtils.java
index f9fb2c0..80b78d8 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/TupleUtils.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/TupleUtils.java
@@ -22,14 +22,13 @@ import backtype.storm.tuple.Tuple;
 
 public final class TupleUtils {
 
-  private TupleUtils() {
-    // No instantiation
-  }
+    private TupleUtils() {
+        // No instantiation
+    }
 
-  public static boolean isTick(Tuple tuple) {
-    return tuple != null
-           && Constants.SYSTEM_COMPONENT_ID  .equals(tuple.getSourceComponent())
-           && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
-  }
+    public static boolean isTick(Tuple tuple) {
+        return tuple != null && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent())
+                && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/Utils.java b/jstorm-core/src/main/java/backtype/storm/utils/Utils.java
index 0669cfb..9194d07 100644
--- a/jstorm-core/src/main/java/backtype/storm/utils/Utils.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/Utils.java
@@ -17,45 +17,21 @@
  */
 package backtype.storm.utils;
 
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.lang.reflect.Constructor;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.net.URLDecoder;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
+import backtype.storm.Config;
+import backtype.storm.generated.ComponentCommon;
+import backtype.storm.generated.ComponentObject;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.serialization.DefaultSerializationDelegate;
+import backtype.storm.serialization.SerializationDelegate;
+import clojure.lang.IFn;
+import clojure.lang.RT;
+import com.alibaba.jstorm.utils.LoadConf;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import org.apache.commons.io.input.ClassLoaderObjectInputStream;
 import org.apache.commons.lang.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.thrift.TException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
@@ -64,20 +40,19 @@ import org.json.simple.JSONValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
-import org.yaml.snakeyaml.constructor.SafeConstructor;
 
-import backtype.storm.Config;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.ComponentObject;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.serialization.DefaultSerializationDelegate;
-import backtype.storm.serialization.SerializationDelegate;
-import clojure.lang.IFn;
-import clojure.lang.RT;
-
-import com.alibaba.jstorm.utils.LoadConf;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
+import java.io.*;
+import java.lang.reflect.Constructor;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.net.URLDecoder;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
 
 public class Utils {
     private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
@@ -99,7 +74,7 @@ public class Utils {
         }
     }
 
-    public static Object newInstance(String klass, Object ...params) {
+    public static Object newInstance(String klass, Object... params) {
         try {
             Class c = Class.forName(klass);
             Constructor[] constructors = c.getConstructors();
@@ -111,7 +86,7 @@ public class Utils {
                     break;
                 }
             }
-            
+
             if (con == null) {
                 throw new RuntimeException("Cound not found the corresponding constructor, params=" + params.toString());
             } else {
@@ -128,35 +103,34 @@ public class Utils {
 
     /**
      * Go thrift gzip serializer
+     * 
      * @param obj
      * @return
      */
     public static byte[] serialize(Object obj) {
         /**
-         * @@@
-         * JStorm disable the thrift.gz.serializer
+         * @@@ JStorm disable the thrift.gz.serializer
          */
-        //return serializationDelegate.serialize(obj);
+        // return serializationDelegate.serialize(obj);
         return javaSerialize(obj);
     }
 
     /**
      * Go thrift gzip serializer
-     * @param obj
+     * 
      * @return
      */
     public static <T> T deserialize(byte[] serialized, Class<T> clazz) {
         /**
-         * @@@
-         * JStorm disable the thrift.gz.serializer 
+         * @@@ JStorm disable the thrift.gz.serializer
          */
-        //return serializationDelegate.deserialize(serialized, clazz);
-        return (T)javaDeserialize(serialized);
+        // return serializationDelegate.deserialize(serialized, clazz);
+        return (T) javaDeserialize(serialized);
     }
 
     public static byte[] javaSerialize(Object obj) {
         if (obj instanceof byte[]) {
-            return (byte[])obj;
+            return (byte[]) obj;
         }
         try {
             ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -168,7 +142,7 @@ public class Utils {
             throw new RuntimeException(e);
         }
     }
-    
+
     public static Object maybe_deserialize(byte[] data) {
         if (data == null || data.length == 0) {
             return null;
@@ -179,9 +153,10 @@ public class Utils {
             return null;
         }
     }
-    
+
     /**
      * Deserialized with ClassLoader
+     * 
      * @param serialized
      * @param loader
      * @return
@@ -206,20 +181,20 @@ public class Utils {
             throw new RuntimeException(e);
         }
     }
-    
+
     public static Object javaDeserialize(byte[] serialized) {
         return javaDeserializeWithCL(serialized, WorkerClassLoader.getInstance());
     }
-    
+
     public static <T> T javaDeserialize(byte[] serialized, Class<T> clazz) {
-        return (T)javaDeserializeWithCL(serialized, WorkerClassLoader.getInstance());
+        return (T) javaDeserializeWithCL(serialized, WorkerClassLoader.getInstance());
     }
-    
+
     public static String to_json(Object m) {
         // return JSON.toJSONString(m);
         return JSONValue.toJSONString(m);
     }
-    
+
     public static Object from_json(String json) {
         if (json == null) {
             return null;
@@ -228,14 +203,14 @@ public class Utils {
             return JSONValue.parse(json);
         }
     }
-    
+
     public static String toPrettyJsonString(Object obj) {
         Gson gson2 = new GsonBuilder().setPrettyPrinting().create();
         String ret = gson2.toJson(obj);
-        
+
         return ret;
     }
-    
+
     public static byte[] gzip(byte[] data) {
         try {
             ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -269,9 +244,9 @@ public class Utils {
     public static <T> String join(Iterable<T> coll, String sep) {
         Iterator<T> it = coll.iterator();
         String ret = "";
-        while(it.hasNext()) {
+        while (it.hasNext()) {
             ret = ret + it.next();
-            if(it.hasNext()) {
+            if (it.hasNext()) {
                 ret = ret + sep;
             }
         }
@@ -281,13 +256,14 @@ public class Utils {
     public static void sleep(long millis) {
         try {
             Time.sleep(millis);
-        } catch(InterruptedException e) {
+        } catch (InterruptedException e) {
             throw new RuntimeException(e);
         }
     }
 
     /**
      * Please directly use LoadConf.findResources(name);
+     * 
      * @param name
      * @return
      */
@@ -298,6 +274,7 @@ public class Utils {
 
     /**
      * Please directly use LoadConf.findAndReadYaml(name);
+     * 
      * @param name
      * @return
      */
@@ -306,9 +283,8 @@ public class Utils {
         return LoadConf.findAndReadYaml(name, mustExist, false);
     }
 
-
     public static Map findAndReadConfigFile(String name) {
-    	return LoadConf.findAndReadYaml(name, true, false);
+        return LoadConf.findAndReadYaml(name, true, false);
     }
 
     public static Map readDefaultConfig() {
@@ -318,7 +294,7 @@ public class Utils {
     public static Map readCommandLineOpts() {
         Map ret = new HashMap();
         String commandOptions = System.getProperty("storm.options");
-        if(commandOptions != null) {
+        if (commandOptions != null) {
             String[] configs = commandOptions.split(",");
             for (String config : configs) {
                 config = URLDecoder.decode(config);
@@ -335,22 +311,21 @@ public class Utils {
         return ret;
     }
 
-    
     public static void replaceLocalDir(Map<Object, Object> conf) {
         String stormHome = System.getProperty("jstorm.home");
         boolean isEmpty = StringUtils.isBlank(stormHome);
-        
+
         Map<Object, Object> replaceMap = new HashMap<Object, Object>();
-        
+
         for (Entry entry : conf.entrySet()) {
             Object key = entry.getKey();
             Object value = entry.getValue();
-            
+
             if (value instanceof String) {
                 if (StringUtils.isBlank((String) value) == true) {
                     continue;
                 }
-                
+
                 String str = (String) value;
                 if (isEmpty == true) {
                     // replace %JSTORM_HOME% as current directory
@@ -358,20 +333,20 @@ public class Utils {
                 } else {
                     str = str.replace("%JSTORM_HOME%", stormHome);
                 }
-                
+
                 replaceMap.put(key, str);
             }
         }
-        
+
         conf.putAll(replaceMap);
     }
-    
+
     public static Map loadDefinedConf(String confFile) {
         File file = new File(confFile);
         if (file.exists() == false) {
             return findAndReadConfigFile(confFile, true);
         }
-        
+
         Yaml yaml = new Yaml();
         Map ret;
         try {
@@ -381,10 +356,10 @@ public class Utils {
         }
         if (ret == null)
             ret = new HashMap();
-        
+
         return new HashMap(ret);
     }
-    
+
     public static Map readStormConfig() {
         Map ret = readDefaultConfig();
         String confFile = System.getProperty("storm.conf.file");
@@ -396,40 +371,41 @@ public class Utils {
         }
         ret.putAll(storm);
         ret.putAll(readCommandLineOpts());
-        
+
         replaceLocalDir(ret);
         return ret;
     }
 
     private static Object normalizeConf(Object conf) {
-        if(conf==null) return new HashMap();
-        if(conf instanceof Map) {
+        if (conf == null)
+            return new HashMap();
+        if (conf instanceof Map) {
             Map confMap = new HashMap((Map) conf);
-            for(Object key: confMap.keySet()) {
+            for (Object key : confMap.keySet()) {
                 Object val = confMap.get(key);
                 confMap.put(key, normalizeConf(val));
             }
             return confMap;
-        } else if(conf instanceof List) {
-            List confList =  new ArrayList((List) conf);
-            for(int i=0; i<confList.size(); i++) {
+        } else if (conf instanceof List) {
+            List confList = new ArrayList((List) conf);
+            for (int i = 0; i < confList.size(); i++) {
                 Object val = confList.get(i);
                 confList.set(i, normalizeConf(val));
             }
             return confList;
         } else if (conf instanceof Integer) {
             return ((Integer) conf).longValue();
-        } else if(conf instanceof Float) {
+        } else if (conf instanceof Float) {
             return ((Float) conf).doubleValue();
         } else {
             return conf;
         }
     }
-    
+
     public static boolean isValidConf(Map<String, Object> stormConf) {
         return normalizeConf(stormConf).equals(normalizeConf(Utils.from_json(Utils.to_json(stormConf))));
     }
-    
+
     public static Object getSetComponentObject(ComponentObject obj, URLClassLoader loader) {
         if (obj.getSetField() == ComponentObject._Fields.SERIALIZED_JAVA) {
             return javaDeserializeWithCL(obj.get_serialized_java(), loader);
@@ -439,7 +415,7 @@ public class Utils {
             return obj.get_shell();
         }
     }
-    
+
     public static <S, T> T get(Map<S, T> m, S key, T def) {
         T ret = m.get(key);
         if (ret == null) {
@@ -447,7 +423,7 @@ public class Utils {
         }
         return ret;
     }
-    
+
     public static List<Object> tuple(Object... values) {
         List<Object> ret = new ArrayList<Object>();
         for (Object v : values) {
@@ -455,7 +431,7 @@ public class Utils {
         }
         return ret;
     }
-    
+
     public static void downloadFromMaster(Map conf, String file, String localFile) throws IOException, TException {
         WritableByteChannel out = null;
         NimbusClient client = null;
@@ -478,12 +454,12 @@ public class Utils {
                 client.close();
         }
     }
-	
+
     public static IFn loadClojureFn(String namespace, String name) {
         try {
-          clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")"));
+            clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")"));
         } catch (Exception e) {
-          //if playing from the repl and defining functions, file won't exist
+            // if playing from the repl and defining functions, file won't exist
         }
         return (IFn) RT.var(namespace, name).deref();
     }
@@ -494,38 +470,38 @@ public class Utils {
 
     public static <K, V> Map<V, K> reverseMap(Map<K, V> map) {
         Map<V, K> ret = new HashMap<V, K>();
-        for(K key: map.keySet()) {
+        for (K key : map.keySet()) {
             ret.put(map.get(key), key);
         }
         return ret;
     }
 
     public static ComponentCommon getComponentCommon(StormTopology topology, String id) {
-        if(topology.get_spouts().containsKey(id)) {
+        if (topology.get_spouts().containsKey(id)) {
             return topology.get_spouts().get(id).get_common();
         }
-        if(topology.get_bolts().containsKey(id)) {
+        if (topology.get_bolts().containsKey(id)) {
             return topology.get_bolts().get(id).get_common();
         }
-        if(topology.get_state_spouts().containsKey(id)) {
+        if (topology.get_state_spouts().containsKey(id)) {
             return topology.get_state_spouts().get(id).get_common();
         }
         throw new IllegalArgumentException("Could not find component with id " + id);
     }
 
     public static Integer getInt(Object o) {
-      Integer result = getInt(o, null);
-      if (null == result) {
-        throw new IllegalArgumentException("Don't know how to convert null to int");
-      }
-      return result;
+        Integer result = getInt(o, null);
+        if (null == result) {
+            throw new IllegalArgumentException("Don't know how to convert null to int");
+        }
+        return result;
     }
-    
+
     public static Integer getInt(Object o, Integer defaultValue) {
         if (null == o) {
             return defaultValue;
         }
-        
+
         if (o instanceof Number) {
             return ((Number) o).intValue();
         } else if (o instanceof String) {
@@ -534,38 +510,18 @@ public class Utils {
             throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
         }
     }
-    
+
     public static long secureRandomLong() {
         return UUID.randomUUID().getLeastSignificantBits();
     }
-	
-    public static class BoundedExponentialBackoffRetry extends ExponentialBackoffRetry {
-        
-        protected final int maxRetryInterval;
-        
-        public BoundedExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepTimeMs) {
-            super(baseSleepTimeMs, maxRetries);
-            this.maxRetryInterval = maxSleepTimeMs;
-        }
-        
-        public int getMaxRetryInterval() {
-            return this.maxRetryInterval;
-        }
-        
-        @Override
-        public int getSleepTimeMs(int count, long elapsedMs) {
-            return Math.min(maxRetryInterval, super.getSleepTimeMs(count, elapsedMs));
-        }
-        
-    }
-    
+
     public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root) {
         return newCurator(conf, servers, port, root, null);
     }
 
     public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) {
         List<String> serverPorts = new ArrayList<String>();
-        for(String zkServer: (List<String>) servers) {
+        for (String zkServer : (List<String>) servers) {
             serverPorts.add(zkServer + ":" + Utils.getInt(port));
         }
         String zkStr = StringUtils.join(serverPorts, ",") + root;
@@ -576,17 +532,15 @@ public class Utils {
         return builder.build();
     }
 
-    protected static void setupBuilder(CuratorFrameworkFactory.Builder builder, String zkStr, Map conf, ZookeeperAuthInfo auth)
-    {
+    protected static void setupBuilder(CuratorFrameworkFactory.Builder builder, String zkStr, Map conf, ZookeeperAuthInfo auth) {
         builder.connectString(zkStr)
-            .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
-            .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
-            .retryPolicy(new StormBoundedExponentialBackoffRetry(
-                        Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)),
-                        Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING)),
-                        Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES))));
-
-        if(auth!=null && auth.scheme!=null && auth.payload!=null) {
+                .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
+                .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
+                .retryPolicy(
+                        new StormBoundedExponentialBackoffRetry(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)), Utils.getInt(conf
+                                .get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING)), Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES))));
+
+        if (auth != null && auth.scheme != null && auth.payload != null) {
             builder = builder.authorization(auth.scheme, auth.payload);
         }
     }
@@ -608,15 +562,10 @@ public class Utils {
     }
 
     /**
-     *
-(defn integer-divided [sum num-pieces]
-  (let [base (int (/ sum num-pieces))
-        num-inc (mod sum num-pieces)
-        num-bases (- num-pieces num-inc)]
-    (if (= num-inc 0)
-      {base num-bases}
-      {base num-bases (inc base) num-inc}
-      )))
+     * 
+     (defn integer-divided [sum num-pieces] (let [base (int (/ sum num-pieces)) num-inc (mod sum num-pieces) num-bases (- num-pieces num-inc)] (if (= num-inc
+     * 0) {base num-bases} {base num-bases (inc base) num-inc} )))
+     * 
      * @param sum
      * @param numPieces
      * @return
@@ -628,8 +577,8 @@ public class Utils {
         int numBases = numPieces - numInc;
         TreeMap<Integer, Integer> ret = new TreeMap<Integer, Integer>();
         ret.put(base, numBases);
-        if(numInc!=0) {
-            ret.put(base+1, numInc);
+        if (numInc != 0) {
+            ret.put(base + 1, numInc);
         }
         return ret;
     }
@@ -644,7 +593,7 @@ public class Utils {
         try {
             BufferedReader r = new BufferedReader(new InputStreamReader(in));
             String line = null;
-            while ((line = r.readLine())!= null) {
+            while ((line = r.readLine()) != null) {
                 LOG.info("{}:{}", prefix, line);
             }
         } catch (IOException e) {
@@ -654,8 +603,8 @@ public class Utils {
 
     public static boolean exceptionCauseIsInstanceOf(Class klass, Throwable throwable) {
         Throwable t = throwable;
-        while(t != null) {
-            if(klass.isInstance(t)) {
+        while (t != null) {
+            if (klass.isInstance(t)) {
                 return true;
             }
             t = t.getCause();
@@ -664,71 +613,70 @@ public class Utils {
     }
 
     /**
-     * Is the cluster configured to interact with ZooKeeper in a secure way?
-     * This only works when called from within Nimbus or a Supervisor process.
+     * Is the cluster configured to interact with ZooKeeper in a secure way? This only works when called from within Nimbus or a Supervisor process.
+     * 
      * @param conf the storm configuration, not the topology configuration
      * @return true if it is configured else false.
      */
     public static boolean isZkAuthenticationConfiguredStormServer(Map conf) {
         return null != System.getProperty("java.security.auth.login.config")
-            || (conf != null
-                && conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME) != null
-                && ! ((String)conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME)).isEmpty());
+                || (conf != null && conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME) != null && !((String) conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME)).isEmpty());
     }
 
     /**
      * Is the topology configured to have ZooKeeper authentication.
+     * 
      * @param conf the topology configuration
      * @return true if ZK is configured else false
      */
     public static boolean isZkAuthenticationConfiguredTopology(Map conf) {
-        return (conf != null
-                && conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME) != null
-                && ! ((String)conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty());
+        return (conf != null && conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME) != null && !((String) conf
+                .get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty());
     }
 
     public static List<ACL> getWorkerACL(Map conf) {
-        //This is a work around to an issue with ZK where a sasl super user is not super unless there is an open SASL ACL so we are trying to give the correct perms
+        // This is a work around to an issue with ZK where a sasl super user is not super unless there is an open SASL ACL so we are trying to give the correct
+        // perms
         if (!isZkAuthenticationConfiguredTopology(conf)) {
             return null;
         }
-        String stormZKUser = (String)conf.get(Config.STORM_ZOOKEEPER_SUPERACL);
+        String stormZKUser = (String) conf.get(Config.STORM_ZOOKEEPER_SUPERACL);
         if (stormZKUser == null) {
-           throw new IllegalArgumentException("Authentication is enabled but "+Config.STORM_ZOOKEEPER_SUPERACL+" is not set");
+            throw new IllegalArgumentException("Authentication is enabled but " + Config.STORM_ZOOKEEPER_SUPERACL + " is not set");
         }
-        String[] split = stormZKUser.split(":",2);
+        String[] split = stormZKUser.split(":", 2);
         if (split.length != 2) {
-          throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL+" does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
+            throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL + " does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
         }
         ArrayList<ACL> ret = new ArrayList<ACL>(ZooDefs.Ids.CREATOR_ALL_ACL);
         ret.add(new ACL(ZooDefs.Perms.ALL, new Id(split[0], split[1])));
         return ret;
     }
 
-   public static String threadDump() {
-       final StringBuilder dump = new StringBuilder();
-       final java.lang.management.ThreadMXBean threadMXBean =  java.lang.management.ManagementFactory.getThreadMXBean();
-       final java.lang.management.ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
-       for (java.lang.management.ThreadInfo threadInfo : threadInfos) {
-           dump.append('"');
-           dump.append(threadInfo.getThreadName());
-           dump.append("\" ");
-           final Thread.State state = threadInfo.getThreadState();
-           dump.append("\n   java.lang.Thread.State: ");
-           dump.append(state);
-           final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace();
-           for (final StackTraceElement stackTraceElement : stackTraceElements) {
-               dump.append("\n        at ");
-               dump.append(stackTraceElement);
-           }
-           dump.append("\n\n");
-       }
-       return dump.toString();
-   }
+    public static String threadDump() {
+        final StringBuilder dump = new StringBuilder();
+        final java.lang.management.ThreadMXBean threadMXBean = java.lang.management.ManagementFactory.getThreadMXBean();
+        final java.lang.management.ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
+        for (java.lang.management.ThreadInfo threadInfo : threadInfos) {
+            dump.append('"');
+            dump.append(threadInfo.getThreadName());
+            dump.append("\" ");
+            final Thread.State state = threadInfo.getThreadState();
+            dump.append("\n   java.lang.Thread.State: ");
+            dump.append(state);
+            final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace();
+            for (final StackTraceElement stackTraceElement : stackTraceElements) {
+                dump.append("\n        at ");
+                dump.append(stackTraceElement);
+            }
+            dump.append("\n\n");
+        }
+        return dump.toString();
+    }
 
     // Assumes caller is synchronizing
     private static SerializationDelegate getSerializationDelegate(Map stormConf) {
-        String delegateClassName = (String)stormConf.get(Config.STORM_META_SERIALIZATION_DELEGATE);
+        String delegateClassName = (String) stormConf.get(Config.STORM_META_SERIALIZATION_DELEGATE);
         SerializationDelegate delegate;
         try {
             Class delegateClass = Class.forName(delegateClassName);
@@ -747,27 +695,25 @@ public class Utils {
         return delegate;
     }
 
-  public static void handleUncaughtException(Throwable t) {
-    if (t != null && t instanceof Error) {
-      if (t instanceof OutOfMemoryError) {
-        try {
-          System.err.println("Halting due to Out Of Memory Error..." + Thread.currentThread().getName());
-        } catch (Throwable err) {
-          //Again we don't want to exit because of logging issues.
+    public static void handleUncaughtException(Throwable t) {
+        if (t != null && t instanceof Error) {
+            if (t instanceof OutOfMemoryError) {
+                try {
+                    System.err.println("Halting due to Out Of Memory Error..." + Thread.currentThread().getName());
+                } catch (Throwable err) {
+                    // Again we don't want to exit because of logging issues.
+                }
+                Runtime.getRuntime().halt(-1);
+            } else {
+                // Running in daemon mode, we would pass Error to calling thread.
+                throw (Error) t;
+            }
         }
-        Runtime.getRuntime().halt(-1);
-      } else {
-        //Running in daemon mode, we would pass Error to calling thread.
-        throw (Error) t;
-      }
     }
-  }
-
 
-    
     public static List<String> tokenize_path(String path) {
         String[] toks = path.split("/");
-        java.util.ArrayList<String> rtn = new ArrayList<String>();
+        ArrayList<String> rtn = new ArrayList<String>();
         for (String str : toks) {
             if (!str.isEmpty()) {
                 rtn.add(str);
@@ -775,7 +721,7 @@ public class Utils {
         }
         return rtn;
     }
-    
+
     public static String toks_to_path(List<String> toks) {
         StringBuffer buff = new StringBuffer();
         buff.append("/");
@@ -785,16 +731,16 @@ public class Utils {
             if (i < (size - 1)) {
                 buff.append("/");
             }
-            
+
         }
         return buff.toString();
     }
-    
+
     public static String normalize_path(String path) {
         String rtn = toks_to_path(tokenize_path(path));
         return rtn;
     }
-    
+
     public static String printStack() {
         StringBuilder sb = new StringBuilder();
         sb.append("\nCurrent call stack:\n");
@@ -802,14 +748,14 @@ public class Utils {
         for (int i = 2; i < stackElements.length; i++) {
             sb.append("\t").append(stackElements[i]).append("\n");
         }
-        
+
         return sb.toString();
     }
-    
+
     private static Map loadProperty(String prop) {
         Map ret = new HashMap<Object, Object>();
         Properties properties = new Properties();
-        
+
         try {
             InputStream stream = new FileInputStream(prop);
             properties.load(stream);
@@ -826,14 +772,14 @@ public class Utils {
             e1.printStackTrace();
             throw new RuntimeException(e1.getMessage());
         }
-        
+
         return ret;
     }
-    
+
     private static Map loadYaml(String confPath) {
         Map ret = new HashMap<Object, Object>();
         Yaml yaml = new Yaml();
-        
+
         try {
             InputStream stream = new FileInputStream(confPath);
             ret = (Map) yaml.load(stream);
@@ -848,10 +794,10 @@ public class Utils {
             e1.printStackTrace();
             throw new RuntimeException("Failed to read config file");
         }
-        
+
         return ret;
     }
-    
+
     public static Map loadConf(String arg) {
         Map ret = null;
         if (arg.endsWith("yaml")) {
@@ -866,13 +812,11 @@ public class Utils {
         String ret = "";
         InputStream input = null;
         try {
-            input =
-                    Utils.class.getClassLoader().getResourceAsStream("version");
+            input = Utils.class.getClassLoader().getResourceAsStream("version");
             BufferedReader in = new BufferedReader(new InputStreamReader(input));
-			String s = in.readLine();
-			ret = s.trim();
-			
-			
+            String s = in.readLine();
+            ret = s.trim();
+
         } catch (Exception e) {
             LOG.warn("Failed to get version", e);
         } finally {
@@ -892,7 +836,7 @@ public class Utils {
         bytes[offset++] = (byte) (value & 0x000000FF);
         bytes[offset++] = (byte) ((value & 0x0000FF00) >> 8);
         bytes[offset++] = (byte) ((value & 0x00FF0000) >> 16);
-        bytes[offset]   = (byte) ((value & 0xFF000000) >> 24);        
+        bytes[offset] = (byte) ((value & 0xFF000000) >> 24);
     }
 
     public static int readIntFromByteArray(byte[] bytes, int offset) {
@@ -900,7 +844,7 @@ public class Utils {
         ret = ret | (bytes[offset++] & 0x000000FF);
         ret = ret | ((bytes[offset++] << 8) & 0x0000FF00);
         ret = ret | ((bytes[offset++] << 16) & 0x00FF0000);
-        ret = ret | ((bytes[offset]   << 24) & 0xFF000000);
+        ret = ret | ((bytes[offset] << 24) & 0xFF000000);
         return ret;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/VersionInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/VersionInfo.java b/jstorm-core/src/main/java/backtype/storm/utils/VersionInfo.java
index 1740e18..456dfd0 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/VersionInfo.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/VersionInfo.java
@@ -24,108 +24,102 @@ import java.util.Properties;
 
 public class VersionInfo {
 
-  private Properties info;
-
-  protected VersionInfo(String component) {
-    info = new Properties();
-    String versionInfoFile = component + "-version-info.properties";
-    InputStream is = null;
-    try {
-      is = Thread.currentThread().getContextClassLoader()
-        .getResourceAsStream(versionInfoFile);
-      if (is == null) {
-        throw new IOException("Resource not found");
-      }
-      info.load(is);
-    } catch (IOException ex) {
-    } finally {
-      if (is != null) {
-      try {
-
-         is.close();
-     } catch (IOException ioex) {
-     }
-
-     }
-    }
-  }
-
-  protected String _getVersion() {
-    return info.getProperty("version", "Unknown");
-  }
-
-  protected String _getRevision() {
-    return info.getProperty("revision", "Unknown");
-  }
-
-  protected String _getBranch() {
-    return info.getProperty("branch", "Unknown");
-  }
-
-  protected String _getDate() {
-    return info.getProperty("date", "Unknown");
-  }
-
-  protected String _getUser() {
-    return info.getProperty("user", "Unknown");
-  }
-
-  protected String _getUrl() {
-    return info.getProperty("url", "Unknown");
-  }
-
-  protected String _getSrcChecksum() {
-    return info.getProperty("srcChecksum", "Unknown");
-  }
-
-  protected String _getBuildVersion(){
-    return getVersion() +
-      " from " + _getRevision() +
-      " by " + _getUser() +
-      " source checksum " + _getSrcChecksum();
-  }
-
-
-  private static VersionInfo COMMON_VERSION_INFO = new VersionInfo("storm-core");
-
-  public static String getVersion() {
-    return COMMON_VERSION_INFO._getVersion();
-  }
-  
-  public static String getRevision() {
-    return COMMON_VERSION_INFO._getRevision();
-  }
-
-  public static String getBranch() {
-    return COMMON_VERSION_INFO._getBranch();
-  }
-
-  public static String getDate() {
-    return COMMON_VERSION_INFO._getDate();
-  }
-  
-  public static String getUser() {
-    return COMMON_VERSION_INFO._getUser();
-  }
-  
-  public static String getUrl() {
-    return COMMON_VERSION_INFO._getUrl();
-  }
-
-  public static String getSrcChecksum() {
-    return COMMON_VERSION_INFO._getSrcChecksum();
-  }
-
-  public static String getBuildVersion(){
-    return COMMON_VERSION_INFO._getBuildVersion();
-  }
-
-
-  public static void main(String[] args) {
-    System.out.println("Storm " + getVersion());
-    System.out.println("URL " + getUrl() + " -r " + getRevision());
-    System.out.println("Branch " + getBranch());
-    System.out.println("Compiled by " + getUser() + " on " + getDate());
-    System.out.println("From source with checksum " + getSrcChecksum());
-  }
+    private Properties info;
+
+    protected VersionInfo(String component) {
+        info = new Properties();
+        String versionInfoFile = component + "-version-info.properties";
+        InputStream is = null;
+        try {
+            is = Thread.currentThread().getContextClassLoader().getResourceAsStream(versionInfoFile);
+            if (is == null) {
+                throw new IOException("Resource not found");
+            }
+            info.load(is);
+        } catch (IOException ex) {
+        } finally {
+            if (is != null) {
+                try {
+
+                    is.close();
+                } catch (IOException ioex) {
+                }
+
+            }
+        }
+    }
+
+    protected String _getVersion() {
+        return info.getProperty("version", "Unknown");
+    }
+
+    protected String _getRevision() {
+        return info.getProperty("revision", "Unknown");
+    }
+
+    protected String _getBranch() {
+        return info.getProperty("branch", "Unknown");
+    }
+
+    protected String _getDate() {
+        return info.getProperty("date", "Unknown");
+    }
+
+    protected String _getUser() {
+        return info.getProperty("user", "Unknown");
+    }
+
+    protected String _getUrl() {
+        return info.getProperty("url", "Unknown");
+    }
+
+    protected String _getSrcChecksum() {
+        return info.getProperty("srcChecksum", "Unknown");
+    }
+
+    protected String _getBuildVersion() {
+        return getVersion() + " from " + _getRevision() + " by " + _getUser() + " source checksum " + _getSrcChecksum();
+    }
+
+    private static VersionInfo COMMON_VERSION_INFO = new VersionInfo("storm-core");
+
+    public static String getVersion() {
+        return COMMON_VERSION_INFO._getVersion();
+    }
+
+    public static String getRevision() {
+        return COMMON_VERSION_INFO._getRevision();
+    }
+
+    public static String getBranch() {
+        return COMMON_VERSION_INFO._getBranch();
+    }
+
+    public static String getDate() {
+        return COMMON_VERSION_INFO._getDate();
+    }
+
+    public static String getUser() {
+        return COMMON_VERSION_INFO._getUser();
+    }
+
+    public static String getUrl() {
+        return COMMON_VERSION_INFO._getUrl();
+    }
+
+    public static String getSrcChecksum() {
+        return COMMON_VERSION_INFO._getSrcChecksum();
+    }
+
+    public static String getBuildVersion() {
+        return COMMON_VERSION_INFO._getBuildVersion();
+    }
+
+    public static void main(String[] args) {
+        System.out.println("Storm " + getVersion());
+        System.out.println("URL " + getUrl() + " -r " + getRevision());
+        System.out.println("Branch " + getBranch());
+        System.out.println("Compiled by " + getUser() + " on " + getDate());
+        System.out.println("From source with checksum " + getSrcChecksum());
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/VersionedStore.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/VersionedStore.java b/jstorm-core/src/main/java/backtype/storm/utils/VersionedStore.java
index 07ce5a8..0852292 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/VersionedStore.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/VersionedStore.java
@@ -30,10 +30,10 @@ public class VersionedStore {
     private static final String FINISHED_VERSION_SUFFIX = ".version";
 
     private String _root;
-    
+
     public VersionedStore(String path) throws IOException {
-      _root = path;
-      mkdirs(_root);
+        _root = path;
+        mkdirs(_root);
     }
 
     public String getRoot() {
@@ -46,26 +46,30 @@ public class VersionedStore {
 
     public String mostRecentVersionPath() throws IOException {
         Long v = mostRecentVersion();
-        if(v==null) return null;
+        if (v == null)
+            return null;
         return versionPath(v);
     }
 
     public String mostRecentVersionPath(long maxVersion) throws IOException {
         Long v = mostRecentVersion(maxVersion);
-        if(v==null) return null;
+        if (v == null)
+            return null;
         return versionPath(v);
     }
 
     public Long mostRecentVersion() throws IOException {
         List<Long> all = getAllVersions();
-        if(all.size()==0) return null;
+        if (all.size() == 0)
+            return null;
         return all.get(0);
     }
 
     public Long mostRecentVersion(long maxVersion) throws IOException {
         List<Long> all = getAllVersions();
-        for(Long v: all) {
-            if(v <= maxVersion) return v;
+        for (Long v : all) {
+            if (v <= maxVersion)
+                return v;
         }
         return null;
     }
@@ -73,7 +77,7 @@ public class VersionedStore {
     public String createVersion() throws IOException {
         Long mostRecent = mostRecentVersion();
         long version = Time.currentTimeMillis();
-        if(mostRecent!=null && version <= mostRecent) {
+        if (mostRecent != null && version <= mostRecent) {
             version = mostRecent + 1;
         }
         return createVersion(version);
@@ -81,7 +85,7 @@ public class VersionedStore {
 
     public String createVersion(long version) throws IOException {
         String ret = versionPath(version);
-        if(getAllVersions().contains(version))
+        if (getAllVersions().contains(version))
             throw new RuntimeException("Version already exists or data already exists");
         else
             return ret;
@@ -95,11 +99,11 @@ public class VersionedStore {
         File versionFile = new File(versionPath(version));
         File tokenFile = new File(tokenPath(version));
 
-        if(tokenFile.exists()) {
+        if (tokenFile.exists()) {
             FileUtils.forceDelete(tokenFile);
         }
 
-        if(versionFile.exists()) {
+        if (versionFile.exists()) {
             FileUtils.forceDelete(versionFile);
         }
     }
@@ -116,14 +120,14 @@ public class VersionedStore {
 
     public void cleanup(int versionsToKeep) throws IOException {
         List<Long> versions = getAllVersions();
-        if(versionsToKeep >= 0) {
+        if (versionsToKeep >= 0) {
             versions = versions.subList(0, Math.min(versions.size(), versionsToKeep));
         }
         HashSet<Long> keepers = new HashSet<Long>(versions);
 
-        for(String p: listDir(_root)) {
+        for (String p : listDir(_root)) {
             Long v = parseVersion(p);
-            if(v!=null && !keepers.contains(v)) {
+            if (v != null && !keepers.contains(v)) {
                 deleteVersion(v);
             }
         }
@@ -134,8 +138,8 @@ public class VersionedStore {
      */
     public List<Long> getAllVersions() throws IOException {
         List<Long> ret = new ArrayList<Long>();
-        for(String s: listDir(_root)) {
-            if(s.endsWith(FINISHED_VERSION_SUFFIX)) {
+        for (String s : listDir(_root)) {
+            if (s.endsWith(FINISHED_VERSION_SUFFIX)) {
                 ret.add(validateAndGetVersion(s));
             }
         }
@@ -150,18 +154,19 @@ public class VersionedStore {
 
     private long validateAndGetVersion(String path) {
         Long v = parseVersion(path);
-        if(v==null) throw new RuntimeException(path + " is not a valid version");
+        if (v == null)
+            throw new RuntimeException(path + " is not a valid version");
         return v;
     }
 
     private Long parseVersion(String path) {
         String name = new File(path).getName();
-        if(name.endsWith(FINISHED_VERSION_SUFFIX)) {
-            name = name.substring(0, name.length()-FINISHED_VERSION_SUFFIX.length());
+        if (name.endsWith(FINISHED_VERSION_SUFFIX)) {
+            name = name.substring(0, name.length() - FINISHED_VERSION_SUFFIX.length());
         }
         try {
             return Long.parseLong(name);
-        } catch(NumberFormatException e) {
+        } catch (NumberFormatException e) {
             return null;
         }
     }
@@ -173,12 +178,12 @@ public class VersionedStore {
     private void mkdirs(String path) throws IOException {
         new File(path).mkdirs();
     }
-    
+
     private List<String> listDir(String dir) throws IOException {
         List<String> ret = new ArrayList<String>();
         File[] contents = new File(dir).listFiles();
-        if(contents!=null) {
-            for(File f: contents) {
+        if (contents != null) {
+            for (File f : contents) {
                 ret.add(f.getAbsolutePath());
             }
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/WindowedTimeThrottler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/WindowedTimeThrottler.java b/jstorm-core/src/main/java/backtype/storm/utils/WindowedTimeThrottler.java
index 5a288a0..6290f5a 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/WindowedTimeThrottler.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/WindowedTimeThrottler.java
@@ -22,28 +22,28 @@ public class WindowedTimeThrottler {
     int _maxAmt;
     long _windowStartTime;
     int _windowEvents = 0;
-    
+
     public WindowedTimeThrottler(Number windowMillis, Number maxAmt) {
         _windowMillis = windowMillis.longValue();
         _maxAmt = maxAmt.intValue();
         _windowStartTime = System.currentTimeMillis();
     }
-    
+
     public boolean isThrottled() {
         resetIfNecessary();
         return _windowEvents >= _maxAmt;
     }
-    
-    //returns void if the event should continue, false if the event should not be done
+
+    // returns void if the event should continue, false if the event should not be done
     public void markEvent() {
         resetIfNecessary();
         _windowEvents++;
-        
+
     }
-    
+
     private void resetIfNecessary() {
         long now = System.currentTimeMillis();
-        if(now - _windowStartTime > _windowMillis) {
+        if (now - _windowStartTime > _windowMillis) {
             _windowStartTime = now;
             _windowEvents = 0;
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/WorkerClassLoader.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/WorkerClassLoader.java b/jstorm-core/src/main/java/backtype/storm/utils/WorkerClassLoader.java
index f3526b1..4c2f35c 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/WorkerClassLoader.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/WorkerClassLoader.java
@@ -28,30 +28,30 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class WorkerClassLoader extends URLClassLoader {
-    
+
     public static Logger LOG = LoggerFactory.getLogger(WorkerClassLoader.class);
-    
+
     private ClassLoader defaultClassLoader;
-    
+
     private ClassLoader JDKClassLoader;
-    
+
     private boolean isDebug;
-    
+
     protected static WorkerClassLoader instance;
-    
+
     protected static boolean enable;
-    
+
     protected static Map<Thread, ClassLoader> threadContextCache;
-    
+
     protected WorkerClassLoader(URL[] urls, ClassLoader defaultClassLoader, ClassLoader JDKClassLoader, boolean isDebug) {
         super(urls, JDKClassLoader);
         this.defaultClassLoader = defaultClassLoader;
         this.JDKClassLoader = JDKClassLoader;
         this.isDebug = isDebug;
-        
+
         // TODO Auto-generated constructor stub
     }
-    
+
     // for all log go through logback when enable classloader
     protected boolean isLogByDefault(String name) {
         if (name.startsWith("org.apache.log4j")) {
@@ -59,11 +59,11 @@ public class WorkerClassLoader extends URLClassLoader {
         } else if (name.startsWith("org.slf4j")) {
             return true;
         }
-        
+
         return false;
-        
+
     }
-    
+
     protected boolean isLoadByDefault(String name) {
         if (name.startsWith("backtype.storm") == true) {
             return true;
@@ -75,100 +75,101 @@ public class WorkerClassLoader extends URLClassLoader {
             return false;
         }
     }
-    
+
     @Override
     public Class<?> loadClass(String name) throws ClassNotFoundException {
         Class<?> result = null;
         try {
             result = this.findLoadedClass(name);
-            
+
             if (result != null) {
                 return result;
             }
-            
+
             try {
                 result = JDKClassLoader.loadClass(name);
                 if (result != null)
                     return result;
             } catch (Exception e) {
-                
+
             }
-            
+
             try {
                 if (isLoadByDefault(name) == false) {
                     result = findClass(name);
-                    
+
                     if (result != null) {
                         return result;
                     }
                 }
-                
+
             } catch (Exception e) {
-                
+
             }
-            
+
             result = defaultClassLoader.loadClass(name);
             return result;
-            
+
         } finally {
             if (result != null) {
                 ClassLoader resultClassLoader = result.getClassLoader();
-                LOG.info("Successfully load class " + name + " by " + resultClassLoader + ",threadContextLoader:" + Thread.currentThread().getContextClassLoader());
+                LOG.info("Successfully load class " + name + " by " + resultClassLoader + ",threadContextLoader:"
+                        + Thread.currentThread().getContextClassLoader());
             } else {
                 LOG.warn("Failed to load class " + name + ",threadContextLoader:" + Thread.currentThread().getContextClassLoader());
             }
-            
+
             if (isDebug) {
                 LOG.info(Utils.printStack());
             }
         }
-        
+
     }
-    
+
     public static WorkerClassLoader mkInstance(URL[] urls, ClassLoader DefaultClassLoader, ClassLoader JDKClassLoader, boolean enable, boolean isDebug) {
         WorkerClassLoader.enable = enable;
         if (enable == false) {
             LOG.info("Don't enable UserDefine ClassLoader");
             return null;
         }
-        
+
         synchronized (WorkerClassLoader.class) {
             if (instance == null) {
                 instance = new WorkerClassLoader(urls, DefaultClassLoader, JDKClassLoader, isDebug);
-                
+
                 threadContextCache = new ConcurrentHashMap<Thread, ClassLoader>();
             }
-            
+
         }
-        
+
         LOG.info("Successfully create classloader " + mk_list(urls));
         return instance;
     }
-    
+
     public static WorkerClassLoader getInstance() {
         return instance;
     }
-    
+
     public static boolean isEnable() {
         return enable;
     }
-    
+
     public static void switchThreadContext() {
         if (enable == false) {
             return;
         }
-        
+
         Thread thread = Thread.currentThread();
         ClassLoader oldClassLoader = thread.getContextClassLoader();
         threadContextCache.put(thread, oldClassLoader);
         thread.setContextClassLoader(instance);
     }
-    
+
     public static void restoreThreadContext() {
         if (enable == false) {
             return;
         }
-        
+
         Thread thread = Thread.currentThread();
         ClassLoader oldClassLoader = threadContextCache.get(thread);
         if (oldClassLoader != null) {
@@ -177,7 +178,7 @@ public class WorkerClassLoader extends URLClassLoader {
             LOG.info("No context classloader of " + thread.getName());
         }
     }
-    
+
     private static <V> List<V> mk_list(V... args) {
         ArrayList<V> rtn = new ArrayList<V>();
         for (V o : args) {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/WritableUtils.java b/jstorm-core/src/main/java/backtype/storm/utils/WritableUtils.java
index 8516f97..2c0a2a3 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/WritableUtils.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/WritableUtils.java
@@ -42,334 +42,314 @@ package backtype.storm.utils;
 
 import java.io.*;
 
-
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
-public final class WritableUtils  {
-
-  public static byte[] readCompressedByteArray(DataInput in) throws IOException {
-    int length = in.readInt();
-    if (length == -1) return null;
-    byte[] buffer = new byte[length];
-    in.readFully(buffer);      // could/should use readFully(buffer,0,length)?
-    GZIPInputStream gzi = new GZIPInputStream(new ByteArrayInputStream(buffer, 0, buffer.length));
-    byte[] outbuf = new byte[length];
-    ByteArrayOutputStream bos =  new ByteArrayOutputStream();
-    int len;
-    while((len=gzi.read(outbuf, 0, outbuf.length)) != -1){
-      bos.write(outbuf, 0, len);
-    }
-    byte[] decompressed =  bos.toByteArray();
-    bos.close();
-    gzi.close();
-    return decompressed;
-  }
-
-  public static void skipCompressedByteArray(DataInput in) throws IOException {
-    int length = in.readInt();
-    if (length != -1) {
-      skipFully(in, length);
-    }
-  }
-
-  public static int  writeCompressedByteArray(DataOutput out, byte[] bytes) throws IOException {
-    if (bytes != null) {
-      ByteArrayOutputStream bos =  new ByteArrayOutputStream();
-      GZIPOutputStream gzout = new GZIPOutputStream(bos);
-      gzout.write(bytes, 0, bytes.length);
-      gzout.close();
-      byte[] buffer = bos.toByteArray();
-      int len = buffer.length;
-      out.writeInt(len);
-      out.write(buffer, 0, len);
-      /* debug only! Once we have confidence, can lose this. */
-      return ((bytes.length != 0) ? (100*buffer.length)/bytes.length : 0);
-    } else {
-      out.writeInt(-1);
-      return -1;
+public final class WritableUtils {
+
+    public static byte[] readCompressedByteArray(DataInput in) throws IOException {
+        int length = in.readInt();
+        if (length == -1)
+            return null;
+        byte[] buffer = new byte[length];
+        in.readFully(buffer); // could/should use readFully(buffer,0,length)?
+        GZIPInputStream gzi = new GZIPInputStream(new ByteArrayInputStream(buffer, 0, buffer.length));
+        byte[] outbuf = new byte[length];
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        int len;
+        while ((len = gzi.read(outbuf, 0, outbuf.length)) != -1) {
+            bos.write(outbuf, 0, len);
+        }
+        byte[] decompressed = bos.toByteArray();
+        bos.close();
+        gzi.close();
+        return decompressed;
     }
-  }
-
-
-  /* Ugly utility, maybe someone else can do this better  */
-  public static String readCompressedString(DataInput in) throws IOException {
-    byte[] bytes = readCompressedByteArray(in);
-    if (bytes == null) return null;
-    return new String(bytes, "UTF-8");
-  }
-
-
-  public static int  writeCompressedString(DataOutput out, String s) throws IOException {
-    return writeCompressedByteArray(out, (s != null) ? s.getBytes("UTF-8") : null);
-  }
-
-  /*
-   *
-   * Write a String as a Network Int n, followed by n Bytes
-   * Alternative to 16 bit read/writeUTF.
-   * Encoding standard is... ?
-   *
-   */
-  public static void writeString(DataOutput out, String s) throws IOException {
-    if (s != null) {
-      byte[] buffer = s.getBytes("UTF-8");
-      int len = buffer.length;
-      out.writeInt(len);
-      out.write(buffer, 0, len);
-    } else {
-      out.writeInt(-1);
-    }
-  }
-
-  /*
-   * Read a String as a Network Int n, followed by n Bytes
-   * Alternative to 16 bit read/writeUTF.
-   * Encoding standard is... ?
-   *
-   */
-  public static String readString(DataInput in) throws IOException{
-    int length = in.readInt();
-    if (length == -1) return null;
-    byte[] buffer = new byte[length];
-    in.readFully(buffer);      // could/should use readFully(buffer,0,length)?
-    return new String(buffer,"UTF-8");
-  }
-
-
-  /*
-   * Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
-   * Could be generalised using introspection.
-   *
-   */
-  public static void writeStringArray(DataOutput out, String[] s) throws IOException{
-    out.writeInt(s.length);
-    for(int i = 0; i < s.length; i++) {
-      writeString(out, s[i]);
+
+    public static void skipCompressedByteArray(DataInput in) throws IOException {
+        int length = in.readInt();
+        if (length != -1) {
+            skipFully(in, length);
+        }
     }
-  }
-
-  /*
-   * Write a String array as a Nework Int N, followed by Int N Byte Array of
-   * compressed Strings. Handles also null arrays and null values.
-   * Could be generalised using introspection.
-   *
-   */
-  public static void writeCompressedStringArray(DataOutput out, String[] s) throws IOException{
-    if (s == null) {
-      out.writeInt(-1);
-      return;
+
+    public static int writeCompressedByteArray(DataOutput out, byte[] bytes) throws IOException {
+        if (bytes != null) {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            GZIPOutputStream gzout = new GZIPOutputStream(bos);
+            gzout.write(bytes, 0, bytes.length);
+            gzout.close();
+            byte[] buffer = bos.toByteArray();
+            int len = buffer.length;
+            out.writeInt(len);
+            out.write(buffer, 0, len);
+            /* debug only! Once we have confidence, can lose this. */
+            return ((bytes.length != 0) ? (100 * buffer.length) / bytes.length : 0);
+        } else {
+            out.writeInt(-1);
+            return -1;
+        }
     }
-    out.writeInt(s.length);
-    for(int i = 0; i < s.length; i++) {
-      writeCompressedString(out, s[i]);
+
+    /* Ugly utility, maybe someone else can do this better */
+    public static String readCompressedString(DataInput in) throws IOException {
+        byte[] bytes = readCompressedByteArray(in);
+        if (bytes == null)
+            return null;
+        return new String(bytes, "UTF-8");
     }
-  }
-
-  /*
-   * Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
-   * Could be generalised using introspection. Actually this bit couldn't...
-   *
-   */
-  public static String[] readStringArray(DataInput in) throws IOException {
-    int len = in.readInt();
-    if (len == -1) return null;
-    String[] s = new String[len];
-    for(int i = 0; i < len; i++) {
-      s[i] = readString(in);
+
+    public static int writeCompressedString(DataOutput out, String s) throws IOException {
+        return writeCompressedByteArray(out, (s != null) ? s.getBytes("UTF-8") : null);
     }
-    return s;
-  }
-
-
-  /*
-   * Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
-   * Could be generalised using introspection. Handles null arrays and null values.
-   *
-   */
-  public static  String[] readCompressedStringArray(DataInput in) throws IOException {
-    int len = in.readInt();
-    if (len == -1) return null;
-    String[] s = new String[len];
-    for(int i = 0; i < len; i++) {
-      s[i] = readCompressedString(in);
+
+    /*
+     * 
+     * Write a String as a Network Int n, followed by n Bytes Alternative to 16 bit read/writeUTF. Encoding standard is... ?
+     */
+    public static void writeString(DataOutput out, String s) throws IOException {
+        if (s != null) {
+            byte[] buffer = s.getBytes("UTF-8");
+            int len = buffer.length;
+            out.writeInt(len);
+            out.write(buffer, 0, len);
+        } else {
+            out.writeInt(-1);
+        }
     }
-    return s;
-  }
-
-
-  /*
-   *
-   * Test Utility Method Display Byte Array.
-   *
-   */
-  public static void displayByteArray(byte[] record){
-    int i;
-    for(i=0;i < record.length -1; i++){
-      if (i % 16 == 0) { System.out.println(); }
-      System.out.print(Integer.toHexString(record[i]  >> 4 & 0x0F));
-      System.out.print(Integer.toHexString(record[i] & 0x0F));
-      System.out.print(",");
+
+    /*
+     * Read a String as a Network Int n, followed by n Bytes Alternative to 16 bit read/writeUTF. Encoding standard is... ?
+     */
+    public static String readString(DataInput in) throws IOException {
+        int length = in.readInt();
+        if (length == -1)
+            return null;
+        byte[] buffer = new byte[length];
+        in.readFully(buffer); // could/should use readFully(buffer,0,length)?
+        return new String(buffer, "UTF-8");
     }
-    System.out.print(Integer.toHexString(record[i]  >> 4 & 0x0F));
-    System.out.print(Integer.toHexString(record[i] & 0x0F));
-    System.out.println();
-  }
-
-
-  /**
-   * Serializes an integer to a binary stream with zero-compressed encoding.
-   * For -120 <= i <= 127, only one byte is used with the actual value.
-   * For other values of i, the first byte value indicates whether the
-   * integer is positive or negative, and the number of bytes that follow.
-   * If the first byte value v is between -121 and -124, the following integer
-   * is positive, with number of bytes that follow are -(v+120).
-   * If the first byte value v is between -125 and -128, the following integer
-   * is negative, with number of bytes that follow are -(v+124). Bytes are
-   * stored in the high-non-zero-byte-first order.
-   *
-   * @param stream Binary output stream
-   * @param i Integer to be serialized
-   * @throws java.io.IOException
-   */
-  public static void writeVInt(DataOutput stream, int i) throws IOException {
-    writeVLong(stream, i);
-  }
-
-  /**
-   * Serializes a long to a binary stream with zero-compressed encoding.
-   * For -112 <= i <= 127, only one byte is used with the actual value.
-   * For other values of i, the first byte value indicates whether the
-   * long is positive or negative, and the number of bytes that follow.
-   * If the first byte value v is between -113 and -120, the following long
-   * is positive, with number of bytes that follow are -(v+112).
-   * If the first byte value v is between -121 and -128, the following long
-   * is negative, with number of bytes that follow are -(v+120). Bytes are
-   * stored in the high-non-zero-byte-first order.
-   *
-   * @param stream Binary output stream
-   * @param i Long to be serialized
-   * @throws java.io.IOException
-   */
-  public static void writeVLong(DataOutput stream, long i) throws IOException {
-    if (i >= -112 && i <= 127) {
-      stream.writeByte((byte)i);
-      return;
+
+    /*
+     * Write a String array as a Nework Int N, followed by Int N Byte Array Strings. Could be generalised using introspection.
+     */
+    public static void writeStringArray(DataOutput out, String[] s) throws IOException {
+        out.writeInt(s.length);
+        for (int i = 0; i < s.length; i++) {
+            writeString(out, s[i]);
+        }
     }
 
-    int len = -112;
-    if (i < 0) {
-      i ^= -1L; // take one's complement'
-      len = -120;
+    /*
+     * Write a String array as a Nework Int N, followed by Int N Byte Array of compressed Strings. Handles also null arrays and null values. Could be
+     * generalised using introspection.
+     */
+    public static void writeCompressedStringArray(DataOutput out, String[] s) throws IOException {
+        if (s == null) {
+            out.writeInt(-1);
+            return;
+        }
+        out.writeInt(s.length);
+        for (int i = 0; i < s.length; i++) {
+            writeCompressedString(out, s[i]);
+        }
     }
 
-    long tmp = i;
-    while (tmp != 0) {
-      tmp = tmp >> 8;
-      len--;
+    /*
+     * Write a String array as a Nework Int N, followed by Int N Byte Array Strings. Could be generalised using introspection. Actually this bit couldn't...
+     */
+    public static String[] readStringArray(DataInput in) throws IOException {
+        int len = in.readInt();
+        if (len == -1)
+            return null;
+        String[] s = new String[len];
+        for (int i = 0; i < len; i++) {
+            s[i] = readString(in);
+        }
+        return s;
     }
 
-    stream.writeByte((byte)len);
+    /*
+     * Write a String array as a Nework Int N, followed by Int N Byte Array Strings. Could be generalised using introspection. Handles null arrays and null
+     * values.
+     */
+    public static String[] readCompressedStringArray(DataInput in) throws IOException {
+        int len = in.readInt();
+        if (len == -1)
+            return null;
+        String[] s = new String[len];
+        for (int i = 0; i < len; i++) {
+            s[i] = readCompressedString(in);
+        }
+        return s;
+    }
 
-    len = (len < -120) ? -(len + 120) : -(len + 112);
+    /*
+     * 
+     * Test Utility Method Display Byte Array.
+     */
+    public static void displayByteArray(byte[] record) {
+        int i;
+        for (i = 0; i < record.length - 1; i++) {
+            if (i % 16 == 0) {
+                System.out.println();
+            }
+            System.out.print(Integer.toHexString(record[i] >> 4 & 0x0F));
+            System.out.print(Integer.toHexString(record[i] & 0x0F));
+            System.out.print(",");
+        }
+        System.out.print(Integer.toHexString(record[i] >> 4 & 0x0F));
+        System.out.print(Integer.toHexString(record[i] & 0x0F));
+        System.out.println();
+    }
 
-    for (int idx = len; idx != 0; idx--) {
-      int shiftbits = (idx - 1) * 8;
-      long mask = 0xFFL << shiftbits;
-      stream.writeByte((byte)((i & mask) >> shiftbits));
+    /**
+     * Serializes an integer to a binary stream with zero-compressed encoding. For -120 <= i <= 127, only one byte is used with the actual value. For other
+     * values of i, the first byte value indicates whether the integer is positive or negative, and the number of bytes that follow. If the first byte value v
+     * is between -121 and -124, the following integer is positive, with number of bytes that follow are -(v+120). If the first byte value v is between -125 and
+     * -128, the following integer is negative, with number of bytes that follow are -(v+124). Bytes are stored in the high-non-zero-byte-first order.
+     * 
+     * @param stream Binary output stream
+     * @param i Integer to be serialized
+     * @throws IOException
+     */
+    public static void writeVInt(DataOutput stream, int i) throws IOException {
+        writeVLong(stream, i);
     }
-  }
-
-
-  /**
-   * Reads a zero-compressed encoded long from input stream and returns it.
-   * @param stream Binary input stream
-   * @throws java.io.IOException
-   * @return deserialized long from stream.
-   */
-  public static long readVLong(DataInput stream) throws IOException {
-    byte firstByte = stream.readByte();
-    int len = decodeVIntSize(firstByte);
-    if (len == 1) {
-      return firstByte;
+
+    /**
+     * Serializes a long to a binary stream with zero-compressed encoding. For -112 <= i <= 127, only one byte is used with the actual value. For other values
+     * of i, the first byte value indicates whether the long is positive or negative, and the number of bytes that follow. If the first byte value v is between
+     * -113 and -120, the following long is positive, with number of bytes that follow are -(v+112). If the first byte value v is between -121 and -128, the
+     * following long is negative, with number of bytes that follow are -(v+120). Bytes are stored in the high-non-zero-byte-first order.
+     * 
+     * @param stream Binary output stream
+     * @param i Long to be serialized
+     * @throws IOException
+     */
+    public static void writeVLong(DataOutput stream, long i) throws IOException {
+        if (i >= -112 && i <= 127) {
+            stream.writeByte((byte) i);
+            return;
+        }
+
+        int len = -112;
+        if (i < 0) {
+            i ^= -1L; // take one's complement'
+            len = -120;
+        }
+
+        long tmp = i;
+        while (tmp != 0) {
+            tmp = tmp >> 8;
+            len--;
+        }
+
+        stream.writeByte((byte) len);
+
+        len = (len < -120) ? -(len + 120) : -(len + 112);
+
+        for (int idx = len; idx != 0; idx--) {
+            int shiftbits = (idx - 1) * 8;
+            long mask = 0xFFL << shiftbits;
+            stream.writeByte((byte) ((i & mask) >> shiftbits));
+        }
     }
-    long i = 0;
-    for (int idx = 0; idx < len-1; idx++) {
-      byte b = stream.readByte();
-      i = i << 8;
-      i = i | (b & 0xFF);
+
+    /**
+     * Reads a zero-compressed encoded long from input stream and returns it.
+     * 
+     * @param stream Binary input stream
+     * @throws IOException
+     * @return deserialized long from stream.
+     */
+    public static long readVLong(DataInput stream) throws IOException {
+        byte firstByte = stream.readByte();
+        int len = decodeVIntSize(firstByte);
+        if (len == 1) {
+            return firstByte;
+        }
+        long i = 0;
+        for (int idx = 0; idx < len - 1; idx++) {
+            byte b = stream.readByte();
+            i = i << 8;
+            i = i | (b & 0xFF);
+        }
+        return (isNegativeVInt(firstByte) ? (i ^ -1L) : i);
     }
-    return (isNegativeVInt(firstByte) ? (i ^ -1L) : i);
-  }
-
-  /**
-   * Reads a zero-compressed encoded integer from input stream and returns it.
-   * @param stream Binary input stream
-   * @throws java.io.IOException
-   * @return deserialized integer from stream.
-   */
-  public static int readVInt(DataInput stream) throws IOException {
-    return (int) readVLong(stream);
-  }
-
-  /**
-   * Given the first byte of a vint/vlong, determine the sign
-   * @param value the first byte
-   * @return is the value negative
-   */
-  public static boolean isNegativeVInt(byte value) {
-    return value < -120 || (value >= -112 && value < 0);
-  }
-
-  /**
-   * Parse the first byte of a vint/vlong to determine the number of bytes
-   * @param value the first byte of the vint/vlong
-   * @return the total number of bytes (1 to 9)
-   */
-  public static int decodeVIntSize(byte value) {
-    if (value >= -112) {
-      return 1;
-    } else if (value < -120) {
-      return -119 - value;
+
+    /**
+     * Reads a zero-compressed encoded integer from input stream and returns it.
+     * 
+     * @param stream Binary input stream
+     * @throws IOException
+     * @return deserialized integer from stream.
+     */
+    public static int readVInt(DataInput stream) throws IOException {
+        return (int) readVLong(stream);
     }
-    return -111 - value;
-  }
-
-  /**
-   * Get the encoded length if an integer is stored in a variable-length format
-   * @return the encoded length
-   */
-  public static int getVIntSize(long i) {
-    if (i >= -112 && i <= 127) {
-      return 1;
+
+    /**
+     * Given the first byte of a vint/vlong, determine the sign
+     * 
+     * @param value the first byte
+     * @return is the value negative
+     */
+    public static boolean isNegativeVInt(byte value) {
+        return value < -120 || (value >= -112 && value < 0);
     }
 
-    if (i < 0) {
-      i ^= -1L; // take one's complement'
+    /**
+     * Parse the first byte of a vint/vlong to determine the number of bytes
+     * 
+     * @param value the first byte of the vint/vlong
+     * @return the total number of bytes (1 to 9)
+     */
+    public static int decodeVIntSize(byte value) {
+        if (value >= -112) {
+            return 1;
+        } else if (value < -120) {
+            return -119 - value;
+        }
+        return -111 - value;
     }
-    // find the number of bytes with non-leading zeros
-    int dataBits = Long.SIZE - Long.numberOfLeadingZeros(i);
-    // find the number of data bytes + length byte
-    return (dataBits + 7) / 8 + 1;
-  }
-
-  /**
-   * Skip <i>len</i> number of bytes in input stream<i>in</i>
-   * @param in input stream
-   * @param len number of bytes to skip
-   * @throws IOException when skipped less number of bytes
-   */
-  public static void skipFully(DataInput in, int len) throws IOException {
-    int total = 0;
-    int cur = 0;
-
-    while ((total<len) && ((cur = in.skipBytes(len-total)) > 0)) {
-        total += cur;
+
+    /**
+     * Get the encoded length if an integer is stored in a variable-length format
+     * 
+     * @return the encoded length
+     */
+    public static int getVIntSize(long i) {
+        if (i >= -112 && i <= 127) {
+            return 1;
+        }
+
+        if (i < 0) {
+            i ^= -1L; // take one's complement'
+        }
+        // find the number of bytes with non-leading zeros
+        int dataBits = Long.SIZE - Long.numberOfLeadingZeros(i);
+        // find the number of data bytes + length byte
+        return (dataBits + 7) / 8 + 1;
     }
 
-    if (total<len) {
-      throw new IOException("Not able to skip " + len + " bytes, possibly " +
-                            "due to end of input.");
+    /**
+     * Skip <i>len</i> number of bytes in input stream<i>in</i>
+     * 
+     * @param in input stream
+     * @param len number of bytes to skip
+     * @throws IOException when skipped less number of bytes
+     */
+    public static void skipFully(DataInput in, int len) throws IOException {
+        int total = 0;
+        int cur = 0;
+
+        while ((total < len) && ((cur = in.skipBytes(len - total)) > 0)) {
+            total += cur;
+        }
+
+        if (total < len) {
+            throw new IOException("Not able to skip " + len + " bytes, possibly " + "due to end of input.");
+        }
     }
-  }
 }