You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:05:04 UTC
[26/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/Time.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/Time.java b/jstorm-core/src/main/java/backtype/storm/utils/Time.java
index 50a79fd..8732008 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/Time.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/Time.java
@@ -24,86 +24,87 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class Time {
- public static Logger LOG = LoggerFactory.getLogger(Time.class);
-
+ public static Logger LOG = LoggerFactory.getLogger(Time.class);
+
private static AtomicBoolean simulating = new AtomicBoolean(false);
- //TODO: should probably use weak references here or something
+ // TODO: should probably use weak references here or something
private static volatile Map<Thread, AtomicLong> threadSleepTimes;
private static final Object sleepTimesLock = new Object();
-
- private static AtomicLong simulatedCurrTimeMs; //should this be a thread local that's allowed to keep advancing?
-
+
+ private static AtomicLong simulatedCurrTimeMs; // should this be a thread local that's allowed to keep advancing?
+
public static void startSimulating() {
- synchronized(sleepTimesLock) {
+ synchronized (sleepTimesLock) {
simulating.set(true);
simulatedCurrTimeMs = new AtomicLong(0);
threadSleepTimes = new ConcurrentHashMap<Thread, AtomicLong>();
}
}
-
+
public static void stopSimulating() {
- synchronized(sleepTimesLock) {
- simulating.set(false);
- threadSleepTimes = null;
+ synchronized (sleepTimesLock) {
+ simulating.set(false);
+ threadSleepTimes = null;
}
}
-
+
public static boolean isSimulating() {
return simulating.get();
}
-
+
public static void sleepUntil(long targetTimeMs) throws InterruptedException {
- if(simulating.get()) {
+ if (simulating.get()) {
try {
- synchronized(sleepTimesLock) {
+ synchronized (sleepTimesLock) {
threadSleepTimes.put(Thread.currentThread(), new AtomicLong(targetTimeMs));
}
- while(simulatedCurrTimeMs.get() < targetTimeMs) {
+ while (simulatedCurrTimeMs.get() < targetTimeMs) {
Thread.sleep(10);
}
} finally {
- synchronized(sleepTimesLock) {
+ synchronized (sleepTimesLock) {
if (simulating.get()) {
threadSleepTimes.remove(Thread.currentThread());
}
}
}
} else {
- long sleepTime = targetTimeMs-currentTimeMillis();
- if(sleepTime>0)
+ long sleepTime = targetTimeMs - currentTimeMillis();
+ if (sleepTime > 0)
Thread.sleep(sleepTime);
}
}
-
+
public static void sleep(long ms) throws InterruptedException {
- sleepUntil(currentTimeMillis()+ms);
+ sleepUntil(currentTimeMillis() + ms);
}
-
+
public static long currentTimeMillis() {
- if(simulating.get()) {
+ if (simulating.get()) {
return simulatedCurrTimeMs.get();
} else {
return System.currentTimeMillis();
}
}
-
+
public static int currentTimeSecs() {
return (int) (currentTimeMillis() / 1000);
}
-
+
public static void advanceTime(long ms) {
- if(!simulating.get()) throw new IllegalStateException("Cannot simulate time unless in simulation mode");
+ if (!simulating.get())
+ throw new IllegalStateException("Cannot simulate time unless in simulation mode");
simulatedCurrTimeMs.set(simulatedCurrTimeMs.get() + ms);
}
-
+
public static boolean isThreadWaiting(Thread t) {
- if(!simulating.get()) throw new IllegalStateException("Must be in simulation mode");
+ if (!simulating.get())
+ throw new IllegalStateException("Must be in simulation mode");
AtomicLong time;
- synchronized(sleepTimesLock) {
+ synchronized (sleepTimesLock) {
time = threadSleepTimes.get(t);
}
- return !t.isAlive() || time!=null && currentTimeMillis() < time.longValue();
- }
+ return !t.isAlive() || time != null && currentTimeMillis() < time.longValue();
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/TimeCacheMap.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/TimeCacheMap.java b/jstorm-core/src/main/java/backtype/storm/utils/TimeCacheMap.java
index f0a194f..a29a954 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/TimeCacheMap.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/TimeCacheMap.java
@@ -37,18 +37,18 @@ import java.util.Set;
public class TimeCacheMap<K, V> {
// this default ensures things expire at most 50% past the expiration time
private static final int DEFAULT_NUM_BUCKETS = 3;
-
+
@Deprecated
public static interface ExpiredCallback<K, V> {
public void expire(K key, V val);
}
-
+
private LinkedList<HashMap<K, V>> _buckets;
-
+
private final Object _lock = new Object();
private Thread _cleaner;
private ExpiredCallback _callback;
-
+
public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) {
if (numBuckets < 2) {
throw new IllegalArgumentException("numBuckets must be >= 2");
@@ -57,7 +57,7 @@ public class TimeCacheMap<K, V> {
for (int i = 0; i < numBuckets; i++) {
_buckets.add(new HashMap<K, V>());
}
-
+
_callback = callback;
final long expirationMillis = expirationSecs * 1000L;
final long sleepTime = expirationMillis / (numBuckets - 1);
@@ -78,26 +78,26 @@ public class TimeCacheMap<K, V> {
}
}
} catch (InterruptedException ex) {
-
+
}
}
});
_cleaner.setDaemon(true);
_cleaner.start();
}
-
+
public TimeCacheMap(int expirationSecs, ExpiredCallback<K, V> callback) {
this(expirationSecs, DEFAULT_NUM_BUCKETS, callback);
}
-
+
public TimeCacheMap(int expirationSecs) {
this(expirationSecs, DEFAULT_NUM_BUCKETS);
}
-
+
public TimeCacheMap(int expirationSecs, int numBuckets) {
this(expirationSecs, numBuckets, null);
}
-
+
public boolean containsKey(K key) {
synchronized (_lock) {
for (HashMap<K, V> bucket : _buckets) {
@@ -108,7 +108,7 @@ public class TimeCacheMap<K, V> {
return false;
}
}
-
+
public V get(K key) {
synchronized (_lock) {
for (HashMap<K, V> bucket : _buckets) {
@@ -119,7 +119,7 @@ public class TimeCacheMap<K, V> {
return null;
}
}
-
+
public void put(K key, V value) {
synchronized (_lock) {
Iterator<HashMap<K, V>> it = _buckets.iterator();
@@ -131,7 +131,7 @@ public class TimeCacheMap<K, V> {
}
}
}
-
+
public Object remove(K key) {
synchronized (_lock) {
for (HashMap<K, V> bucket : _buckets) {
@@ -142,7 +142,7 @@ public class TimeCacheMap<K, V> {
return null;
}
}
-
+
public int size() {
synchronized (_lock) {
int size = 0;
@@ -152,11 +152,11 @@ public class TimeCacheMap<K, V> {
return size;
}
}
-
+
public void cleanup() {
_cleaner.interrupt();
}
-
+
public Set<K> keySet() {
Set<K> ret = new HashSet<K>();
synchronized (_lock) {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/TransferDrainer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/TransferDrainer.java b/jstorm-core/src/main/java/backtype/storm/utils/TransferDrainer.java
index 4638117..48b39b7 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/TransferDrainer.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/TransferDrainer.java
@@ -26,88 +26,88 @@ import backtype.storm.messaging.TaskMessage;
public class TransferDrainer {
- private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
-
- public void add(HashMap<String, ArrayList<TaskMessage>> workerTupleSetMap) {
- for (String key : workerTupleSetMap.keySet()) {
-
- ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key);
- if (null == bundle) {
- bundle = new ArrayList<ArrayList<TaskMessage>>();
- bundles.put(key, bundle);
- }
-
- ArrayList tupleSet = workerTupleSetMap.get(key);
- if (null != tupleSet && tupleSet.size() > 0) {
- bundle.add(tupleSet);
- }
- }
- }
-
- public void send(HashMap<String, IConnection> connections) {
- for (String hostPort : bundles.keySet()) {
- IConnection connection = connections.get(hostPort);
- if (null != connection) {
- ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(hostPort);
- for (ArrayList<TaskMessage> list : bundle) {
- connection.send(list);
+ private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
+
+ public void add(HashMap<String, ArrayList<TaskMessage>> workerTupleSetMap) {
+ for (String key : workerTupleSetMap.keySet()) {
+
+ ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key);
+ if (null == bundle) {
+ bundle = new ArrayList<ArrayList<TaskMessage>>();
+ bundles.put(key, bundle);
+ }
+
+ ArrayList tupleSet = workerTupleSetMap.get(key);
+ if (null != tupleSet && tupleSet.size() > 0) {
+ bundle.add(tupleSet);
+ }
}
-
- }
- }
- }
-
- private Iterator<TaskMessage> getBundleIterator(final ArrayList<ArrayList<TaskMessage>> bundle) {
-
- if (null == bundle) {
- return null;
}
-
- return new Iterator<TaskMessage> () {
-
- private int offset = 0;
- private int size = 0;
- {
- for (ArrayList<TaskMessage> list : bundle) {
- size += list.size();
- }
- }
-
- private int bundleOffset = 0;
- private Iterator<TaskMessage> iter = bundle.get(bundleOffset).iterator();
-
- @Override
- public boolean hasNext() {
- if (offset < size) {
- return true;
- }
- return false;
- }
-
- @Override
- public TaskMessage next() {
- TaskMessage msg = null;
- if (iter.hasNext()) {
- msg = iter.next();
- } else {
- bundleOffset++;
- iter = bundle.get(bundleOffset).iterator();
- msg = iter.next();
+
+ public void send(HashMap<String, IConnection> connections) {
+ for (String hostPort : bundles.keySet()) {
+ IConnection connection = connections.get(hostPort);
+ if (null != connection) {
+ ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(hostPort);
+ for (ArrayList<TaskMessage> list : bundle) {
+ connection.send(list);
+ }
+
+ }
}
- if (null != msg) {
- offset++;
+ }
+
+ private Iterator<TaskMessage> getBundleIterator(final ArrayList<ArrayList<TaskMessage>> bundle) {
+
+ if (null == bundle) {
+ return null;
}
- return msg;
- }
-
- @Override
- public void remove() {
- throw new RuntimeException("not supported");
- }
- };
- }
-
- public void clear() {
- bundles.clear();
- }
+
+ return new Iterator<TaskMessage>() {
+
+ private int offset = 0;
+ private int size = 0;
+ {
+ for (ArrayList<TaskMessage> list : bundle) {
+ size += list.size();
+ }
+ }
+
+ private int bundleOffset = 0;
+ private Iterator<TaskMessage> iter = bundle.get(bundleOffset).iterator();
+
+ @Override
+ public boolean hasNext() {
+ if (offset < size) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public TaskMessage next() {
+ TaskMessage msg = null;
+ if (iter.hasNext()) {
+ msg = iter.next();
+ } else {
+ bundleOffset++;
+ iter = bundle.get(bundleOffset).iterator();
+ msg = iter.next();
+ }
+ if (null != msg) {
+ offset++;
+ }
+ return msg;
+ }
+
+ @Override
+ public void remove() {
+ throw new RuntimeException("not supported");
+ }
+ };
+ }
+
+ public void clear() {
+ bundles.clear();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/TupleHelpers.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/TupleHelpers.java b/jstorm-core/src/main/java/backtype/storm/utils/TupleHelpers.java
index 45725c9..ce2a0b3 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/TupleHelpers.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/TupleHelpers.java
@@ -22,9 +22,9 @@ import backtype.storm.tuple.Tuple;
public class TupleHelpers {
private TupleHelpers() {
-
+
}
-
+
public static boolean isTickTuple(Tuple tuple) {
return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/TupleUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/TupleUtils.java b/jstorm-core/src/main/java/backtype/storm/utils/TupleUtils.java
index f9fb2c0..80b78d8 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/TupleUtils.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/TupleUtils.java
@@ -22,14 +22,13 @@ import backtype.storm.tuple.Tuple;
public final class TupleUtils {
- private TupleUtils() {
- // No instantiation
- }
+ private TupleUtils() {
+ // No instantiation
+ }
- public static boolean isTick(Tuple tuple) {
- return tuple != null
- && Constants.SYSTEM_COMPONENT_ID .equals(tuple.getSourceComponent())
- && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
- }
+ public static boolean isTick(Tuple tuple) {
+ return tuple != null && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent())
+ && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/Utils.java b/jstorm-core/src/main/java/backtype/storm/utils/Utils.java
index 0669cfb..9194d07 100644
--- a/jstorm-core/src/main/java/backtype/storm/utils/Utils.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/Utils.java
@@ -17,45 +17,21 @@
*/
package backtype.storm.utils;
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.lang.reflect.Constructor;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.net.URLDecoder;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
+import backtype.storm.Config;
+import backtype.storm.generated.ComponentCommon;
+import backtype.storm.generated.ComponentObject;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.serialization.DefaultSerializationDelegate;
+import backtype.storm.serialization.SerializationDelegate;
+import clojure.lang.IFn;
+import clojure.lang.RT;
+import com.alibaba.jstorm.utils.LoadConf;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import org.apache.commons.io.input.ClassLoaderObjectInputStream;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.thrift.TException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
@@ -64,20 +40,19 @@ import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
-import org.yaml.snakeyaml.constructor.SafeConstructor;
-import backtype.storm.Config;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.ComponentObject;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.serialization.DefaultSerializationDelegate;
-import backtype.storm.serialization.SerializationDelegate;
-import clojure.lang.IFn;
-import clojure.lang.RT;
-
-import com.alibaba.jstorm.utils.LoadConf;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
+import java.io.*;
+import java.lang.reflect.Constructor;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.net.URLDecoder;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
public class Utils {
private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
@@ -99,7 +74,7 @@ public class Utils {
}
}
- public static Object newInstance(String klass, Object ...params) {
+ public static Object newInstance(String klass, Object... params) {
try {
Class c = Class.forName(klass);
Constructor[] constructors = c.getConstructors();
@@ -111,7 +86,7 @@ public class Utils {
break;
}
}
-
+
if (con == null) {
throw new RuntimeException("Cound not found the corresponding constructor, params=" + params.toString());
} else {
@@ -128,35 +103,34 @@ public class Utils {
/**
* Go thrift gzip serializer
+ *
* @param obj
* @return
*/
public static byte[] serialize(Object obj) {
/**
- * @@@
- * JStorm disable the thrift.gz.serializer
+ * @@@ JStorm disable the thrift.gz.serializer
*/
- //return serializationDelegate.serialize(obj);
+ // return serializationDelegate.serialize(obj);
return javaSerialize(obj);
}
/**
* Go thrift gzip serializer
- * @param obj
+ *
* @return
*/
public static <T> T deserialize(byte[] serialized, Class<T> clazz) {
/**
- * @@@
- * JStorm disable the thrift.gz.serializer
+ * @@@ JStorm disable the thrift.gz.serializer
*/
- //return serializationDelegate.deserialize(serialized, clazz);
- return (T)javaDeserialize(serialized);
+ // return serializationDelegate.deserialize(serialized, clazz);
+ return (T) javaDeserialize(serialized);
}
public static byte[] javaSerialize(Object obj) {
if (obj instanceof byte[]) {
- return (byte[])obj;
+ return (byte[]) obj;
}
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -168,7 +142,7 @@ public class Utils {
throw new RuntimeException(e);
}
}
-
+
public static Object maybe_deserialize(byte[] data) {
if (data == null || data.length == 0) {
return null;
@@ -179,9 +153,10 @@ public class Utils {
return null;
}
}
-
+
/**
* Deserialized with ClassLoader
+ *
* @param serialized
* @param loader
* @return
@@ -206,20 +181,20 @@ public class Utils {
throw new RuntimeException(e);
}
}
-
+
public static Object javaDeserialize(byte[] serialized) {
return javaDeserializeWithCL(serialized, WorkerClassLoader.getInstance());
}
-
+
public static <T> T javaDeserialize(byte[] serialized, Class<T> clazz) {
- return (T)javaDeserializeWithCL(serialized, WorkerClassLoader.getInstance());
+ return (T) javaDeserializeWithCL(serialized, WorkerClassLoader.getInstance());
}
-
+
public static String to_json(Object m) {
// return JSON.toJSONString(m);
return JSONValue.toJSONString(m);
}
-
+
public static Object from_json(String json) {
if (json == null) {
return null;
@@ -228,14 +203,14 @@ public class Utils {
return JSONValue.parse(json);
}
}
-
+
public static String toPrettyJsonString(Object obj) {
Gson gson2 = new GsonBuilder().setPrettyPrinting().create();
String ret = gson2.toJson(obj);
-
+
return ret;
}
-
+
public static byte[] gzip(byte[] data) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -269,9 +244,9 @@ public class Utils {
public static <T> String join(Iterable<T> coll, String sep) {
Iterator<T> it = coll.iterator();
String ret = "";
- while(it.hasNext()) {
+ while (it.hasNext()) {
ret = ret + it.next();
- if(it.hasNext()) {
+ if (it.hasNext()) {
ret = ret + sep;
}
}
@@ -281,13 +256,14 @@ public class Utils {
public static void sleep(long millis) {
try {
Time.sleep(millis);
- } catch(InterruptedException e) {
+ } catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
* Please directly use LoadConf.findResources(name);
+ *
* @param name
* @return
*/
@@ -298,6 +274,7 @@ public class Utils {
/**
* Please directly use LoadConf.findAndReadYaml(name);
+ *
* @param name
* @return
*/
@@ -306,9 +283,8 @@ public class Utils {
return LoadConf.findAndReadYaml(name, mustExist, false);
}
-
public static Map findAndReadConfigFile(String name) {
- return LoadConf.findAndReadYaml(name, true, false);
+ return LoadConf.findAndReadYaml(name, true, false);
}
public static Map readDefaultConfig() {
@@ -318,7 +294,7 @@ public class Utils {
public static Map readCommandLineOpts() {
Map ret = new HashMap();
String commandOptions = System.getProperty("storm.options");
- if(commandOptions != null) {
+ if (commandOptions != null) {
String[] configs = commandOptions.split(",");
for (String config : configs) {
config = URLDecoder.decode(config);
@@ -335,22 +311,21 @@ public class Utils {
return ret;
}
-
public static void replaceLocalDir(Map<Object, Object> conf) {
String stormHome = System.getProperty("jstorm.home");
boolean isEmpty = StringUtils.isBlank(stormHome);
-
+
Map<Object, Object> replaceMap = new HashMap<Object, Object>();
-
+
for (Entry entry : conf.entrySet()) {
Object key = entry.getKey();
Object value = entry.getValue();
-
+
if (value instanceof String) {
if (StringUtils.isBlank((String) value) == true) {
continue;
}
-
+
String str = (String) value;
if (isEmpty == true) {
// replace %JSTORM_HOME% as current directory
@@ -358,20 +333,20 @@ public class Utils {
} else {
str = str.replace("%JSTORM_HOME%", stormHome);
}
-
+
replaceMap.put(key, str);
}
}
-
+
conf.putAll(replaceMap);
}
-
+
public static Map loadDefinedConf(String confFile) {
File file = new File(confFile);
if (file.exists() == false) {
return findAndReadConfigFile(confFile, true);
}
-
+
Yaml yaml = new Yaml();
Map ret;
try {
@@ -381,10 +356,10 @@ public class Utils {
}
if (ret == null)
ret = new HashMap();
-
+
return new HashMap(ret);
}
-
+
public static Map readStormConfig() {
Map ret = readDefaultConfig();
String confFile = System.getProperty("storm.conf.file");
@@ -396,40 +371,41 @@ public class Utils {
}
ret.putAll(storm);
ret.putAll(readCommandLineOpts());
-
+
replaceLocalDir(ret);
return ret;
}
private static Object normalizeConf(Object conf) {
- if(conf==null) return new HashMap();
- if(conf instanceof Map) {
+ if (conf == null)
+ return new HashMap();
+ if (conf instanceof Map) {
Map confMap = new HashMap((Map) conf);
- for(Object key: confMap.keySet()) {
+ for (Object key : confMap.keySet()) {
Object val = confMap.get(key);
confMap.put(key, normalizeConf(val));
}
return confMap;
- } else if(conf instanceof List) {
- List confList = new ArrayList((List) conf);
- for(int i=0; i<confList.size(); i++) {
+ } else if (conf instanceof List) {
+ List confList = new ArrayList((List) conf);
+ for (int i = 0; i < confList.size(); i++) {
Object val = confList.get(i);
confList.set(i, normalizeConf(val));
}
return confList;
} else if (conf instanceof Integer) {
return ((Integer) conf).longValue();
- } else if(conf instanceof Float) {
+ } else if (conf instanceof Float) {
return ((Float) conf).doubleValue();
} else {
return conf;
}
}
-
+
public static boolean isValidConf(Map<String, Object> stormConf) {
return normalizeConf(stormConf).equals(normalizeConf(Utils.from_json(Utils.to_json(stormConf))));
}
-
+
public static Object getSetComponentObject(ComponentObject obj, URLClassLoader loader) {
if (obj.getSetField() == ComponentObject._Fields.SERIALIZED_JAVA) {
return javaDeserializeWithCL(obj.get_serialized_java(), loader);
@@ -439,7 +415,7 @@ public class Utils {
return obj.get_shell();
}
}
-
+
public static <S, T> T get(Map<S, T> m, S key, T def) {
T ret = m.get(key);
if (ret == null) {
@@ -447,7 +423,7 @@ public class Utils {
}
return ret;
}
-
+
public static List<Object> tuple(Object... values) {
List<Object> ret = new ArrayList<Object>();
for (Object v : values) {
@@ -455,7 +431,7 @@ public class Utils {
}
return ret;
}
-
+
public static void downloadFromMaster(Map conf, String file, String localFile) throws IOException, TException {
WritableByteChannel out = null;
NimbusClient client = null;
@@ -478,12 +454,12 @@ public class Utils {
client.close();
}
}
-
+
public static IFn loadClojureFn(String namespace, String name) {
try {
- clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")"));
+ clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")"));
} catch (Exception e) {
- //if playing from the repl and defining functions, file won't exist
+ // if playing from the repl and defining functions, file won't exist
}
return (IFn) RT.var(namespace, name).deref();
}
@@ -494,38 +470,38 @@ public class Utils {
public static <K, V> Map<V, K> reverseMap(Map<K, V> map) {
Map<V, K> ret = new HashMap<V, K>();
- for(K key: map.keySet()) {
+ for (K key : map.keySet()) {
ret.put(map.get(key), key);
}
return ret;
}
public static ComponentCommon getComponentCommon(StormTopology topology, String id) {
- if(topology.get_spouts().containsKey(id)) {
+ if (topology.get_spouts().containsKey(id)) {
return topology.get_spouts().get(id).get_common();
}
- if(topology.get_bolts().containsKey(id)) {
+ if (topology.get_bolts().containsKey(id)) {
return topology.get_bolts().get(id).get_common();
}
- if(topology.get_state_spouts().containsKey(id)) {
+ if (topology.get_state_spouts().containsKey(id)) {
return topology.get_state_spouts().get(id).get_common();
}
throw new IllegalArgumentException("Could not find component with id " + id);
}
public static Integer getInt(Object o) {
- Integer result = getInt(o, null);
- if (null == result) {
- throw new IllegalArgumentException("Don't know how to convert null to int");
- }
- return result;
+ Integer result = getInt(o, null);
+ if (null == result) {
+ throw new IllegalArgumentException("Don't know how to convert null to int");
+ }
+ return result;
}
-
+
public static Integer getInt(Object o, Integer defaultValue) {
if (null == o) {
return defaultValue;
}
-
+
if (o instanceof Number) {
return ((Number) o).intValue();
} else if (o instanceof String) {
@@ -534,38 +510,18 @@ public class Utils {
throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
}
}
-
+
public static long secureRandomLong() {
return UUID.randomUUID().getLeastSignificantBits();
}
-
- public static class BoundedExponentialBackoffRetry extends ExponentialBackoffRetry {
-
- protected final int maxRetryInterval;
-
- public BoundedExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepTimeMs) {
- super(baseSleepTimeMs, maxRetries);
- this.maxRetryInterval = maxSleepTimeMs;
- }
-
- public int getMaxRetryInterval() {
- return this.maxRetryInterval;
- }
-
- @Override
- public int getSleepTimeMs(int count, long elapsedMs) {
- return Math.min(maxRetryInterval, super.getSleepTimeMs(count, elapsedMs));
- }
-
- }
-
+
public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root) {
return newCurator(conf, servers, port, root, null);
}
public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) {
List<String> serverPorts = new ArrayList<String>();
- for(String zkServer: (List<String>) servers) {
+ for (String zkServer : (List<String>) servers) {
serverPorts.add(zkServer + ":" + Utils.getInt(port));
}
String zkStr = StringUtils.join(serverPorts, ",") + root;
@@ -576,17 +532,15 @@ public class Utils {
return builder.build();
}
- protected static void setupBuilder(CuratorFrameworkFactory.Builder builder, String zkStr, Map conf, ZookeeperAuthInfo auth)
- {
+ protected static void setupBuilder(CuratorFrameworkFactory.Builder builder, String zkStr, Map conf, ZookeeperAuthInfo auth) {
builder.connectString(zkStr)
- .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
- .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
- .retryPolicy(new StormBoundedExponentialBackoffRetry(
- Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)),
- Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING)),
- Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES))));
-
- if(auth!=null && auth.scheme!=null && auth.payload!=null) {
+ .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
+ .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
+ .retryPolicy(
+ new StormBoundedExponentialBackoffRetry(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)), Utils.getInt(conf
+ .get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING)), Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES))));
+
+ if (auth != null && auth.scheme != null && auth.payload != null) {
builder = builder.authorization(auth.scheme, auth.payload);
}
}
@@ -608,15 +562,10 @@ public class Utils {
}
/**
- *
-(defn integer-divided [sum num-pieces]
- (let [base (int (/ sum num-pieces))
- num-inc (mod sum num-pieces)
- num-bases (- num-pieces num-inc)]
- (if (= num-inc 0)
- {base num-bases}
- {base num-bases (inc base) num-inc}
- )))
+ *
+ (defn integer-divided [sum num-pieces] (let [base (int (/ sum num-pieces)) num-inc (mod sum num-pieces) num-bases (- num-pieces num-inc)] (if (= num-inc
+ * 0) {base num-bases} {base num-bases (inc base) num-inc} )))
+ *
* @param sum
* @param numPieces
* @return
@@ -628,8 +577,8 @@ public class Utils {
int numBases = numPieces - numInc;
TreeMap<Integer, Integer> ret = new TreeMap<Integer, Integer>();
ret.put(base, numBases);
- if(numInc!=0) {
- ret.put(base+1, numInc);
+ if (numInc != 0) {
+ ret.put(base + 1, numInc);
}
return ret;
}
@@ -644,7 +593,7 @@ public class Utils {
try {
BufferedReader r = new BufferedReader(new InputStreamReader(in));
String line = null;
- while ((line = r.readLine())!= null) {
+ while ((line = r.readLine()) != null) {
LOG.info("{}:{}", prefix, line);
}
} catch (IOException e) {
@@ -654,8 +603,8 @@ public class Utils {
public static boolean exceptionCauseIsInstanceOf(Class klass, Throwable throwable) {
Throwable t = throwable;
- while(t != null) {
- if(klass.isInstance(t)) {
+ while (t != null) {
+ if (klass.isInstance(t)) {
return true;
}
t = t.getCause();
@@ -664,71 +613,70 @@ public class Utils {
}
/**
- * Is the cluster configured to interact with ZooKeeper in a secure way?
- * This only works when called from within Nimbus or a Supervisor process.
+ * Is the cluster configured to interact with ZooKeeper in a secure way? This only works when called from within Nimbus or a Supervisor process.
+ *
* @param conf the storm configuration, not the topology configuration
* @return true if it is configured else false.
*/
public static boolean isZkAuthenticationConfiguredStormServer(Map conf) {
return null != System.getProperty("java.security.auth.login.config")
- || (conf != null
- && conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME) != null
- && ! ((String)conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME)).isEmpty());
+ || (conf != null && conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME) != null && !((String) conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME)).isEmpty());
}
/**
* Is the topology configured to have ZooKeeper authentication.
+ *
* @param conf the topology configuration
* @return true if ZK is configured else false
*/
public static boolean isZkAuthenticationConfiguredTopology(Map conf) {
- return (conf != null
- && conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME) != null
- && ! ((String)conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty());
+ return (conf != null && conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME) != null && !((String) conf
+ .get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty());
}
public static List<ACL> getWorkerACL(Map conf) {
- //This is a work around to an issue with ZK where a sasl super user is not super unless there is an open SASL ACL so we are trying to give the correct perms
+ // This is a work around to an issue with ZK where a sasl super user is not super unless there is an open SASL ACL so we are trying to give the correct
+ // perms
if (!isZkAuthenticationConfiguredTopology(conf)) {
return null;
}
- String stormZKUser = (String)conf.get(Config.STORM_ZOOKEEPER_SUPERACL);
+ String stormZKUser = (String) conf.get(Config.STORM_ZOOKEEPER_SUPERACL);
if (stormZKUser == null) {
- throw new IllegalArgumentException("Authentication is enabled but "+Config.STORM_ZOOKEEPER_SUPERACL+" is not set");
+ throw new IllegalArgumentException("Authentication is enabled but " + Config.STORM_ZOOKEEPER_SUPERACL + " is not set");
}
- String[] split = stormZKUser.split(":",2);
+ String[] split = stormZKUser.split(":", 2);
if (split.length != 2) {
- throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL+" does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
+ throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL + " does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
}
ArrayList<ACL> ret = new ArrayList<ACL>(ZooDefs.Ids.CREATOR_ALL_ACL);
ret.add(new ACL(ZooDefs.Perms.ALL, new Id(split[0], split[1])));
return ret;
}
- public static String threadDump() {
- final StringBuilder dump = new StringBuilder();
- final java.lang.management.ThreadMXBean threadMXBean = java.lang.management.ManagementFactory.getThreadMXBean();
- final java.lang.management.ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
- for (java.lang.management.ThreadInfo threadInfo : threadInfos) {
- dump.append('"');
- dump.append(threadInfo.getThreadName());
- dump.append("\" ");
- final Thread.State state = threadInfo.getThreadState();
- dump.append("\n java.lang.Thread.State: ");
- dump.append(state);
- final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace();
- for (final StackTraceElement stackTraceElement : stackTraceElements) {
- dump.append("\n at ");
- dump.append(stackTraceElement);
- }
- dump.append("\n\n");
- }
- return dump.toString();
- }
+ public static String threadDump() {
+ final StringBuilder dump = new StringBuilder();
+ final java.lang.management.ThreadMXBean threadMXBean = java.lang.management.ManagementFactory.getThreadMXBean();
+ final java.lang.management.ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100);
+ for (java.lang.management.ThreadInfo threadInfo : threadInfos) {
+ dump.append('"');
+ dump.append(threadInfo.getThreadName());
+ dump.append("\" ");
+ final Thread.State state = threadInfo.getThreadState();
+ dump.append("\n java.lang.Thread.State: ");
+ dump.append(state);
+ final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace();
+ for (final StackTraceElement stackTraceElement : stackTraceElements) {
+ dump.append("\n at ");
+ dump.append(stackTraceElement);
+ }
+ dump.append("\n\n");
+ }
+ return dump.toString();
+ }
// Assumes caller is synchronizing
private static SerializationDelegate getSerializationDelegate(Map stormConf) {
- String delegateClassName = (String)stormConf.get(Config.STORM_META_SERIALIZATION_DELEGATE);
+ String delegateClassName = (String) stormConf.get(Config.STORM_META_SERIALIZATION_DELEGATE);
SerializationDelegate delegate;
try {
Class delegateClass = Class.forName(delegateClassName);
@@ -747,27 +695,25 @@ public class Utils {
return delegate;
}
- public static void handleUncaughtException(Throwable t) {
- if (t != null && t instanceof Error) {
- if (t instanceof OutOfMemoryError) {
- try {
- System.err.println("Halting due to Out Of Memory Error..." + Thread.currentThread().getName());
- } catch (Throwable err) {
- //Again we don't want to exit because of logging issues.
+ public static void handleUncaughtException(Throwable t) {
+ if (t != null && t instanceof Error) {
+ if (t instanceof OutOfMemoryError) {
+ try {
+ System.err.println("Halting due to Out Of Memory Error..." + Thread.currentThread().getName());
+ } catch (Throwable err) {
+ // Again we don't want to exit because of logging issues.
+ }
+ Runtime.getRuntime().halt(-1);
+ } else {
+ // Running in daemon mode, we would pass Error to calling thread.
+ throw (Error) t;
+ }
}
- Runtime.getRuntime().halt(-1);
- } else {
- //Running in daemon mode, we would pass Error to calling thread.
- throw (Error) t;
- }
}
- }
-
-
public static List<String> tokenize_path(String path) {
String[] toks = path.split("/");
- java.util.ArrayList<String> rtn = new ArrayList<String>();
+ ArrayList<String> rtn = new ArrayList<String>();
for (String str : toks) {
if (!str.isEmpty()) {
rtn.add(str);
@@ -775,7 +721,7 @@ public class Utils {
}
return rtn;
}
-
+
public static String toks_to_path(List<String> toks) {
StringBuffer buff = new StringBuffer();
buff.append("/");
@@ -785,16 +731,16 @@ public class Utils {
if (i < (size - 1)) {
buff.append("/");
}
-
+
}
return buff.toString();
}
-
+
public static String normalize_path(String path) {
String rtn = toks_to_path(tokenize_path(path));
return rtn;
}
-
+
public static String printStack() {
StringBuilder sb = new StringBuilder();
sb.append("\nCurrent call stack:\n");
@@ -802,14 +748,14 @@ public class Utils {
for (int i = 2; i < stackElements.length; i++) {
sb.append("\t").append(stackElements[i]).append("\n");
}
-
+
return sb.toString();
}
-
+
private static Map loadProperty(String prop) {
Map ret = new HashMap<Object, Object>();
Properties properties = new Properties();
-
+
try {
InputStream stream = new FileInputStream(prop);
properties.load(stream);
@@ -826,14 +772,14 @@ public class Utils {
e1.printStackTrace();
throw new RuntimeException(e1.getMessage());
}
-
+
return ret;
}
-
+
private static Map loadYaml(String confPath) {
Map ret = new HashMap<Object, Object>();
Yaml yaml = new Yaml();
-
+
try {
InputStream stream = new FileInputStream(confPath);
ret = (Map) yaml.load(stream);
@@ -848,10 +794,10 @@ public class Utils {
e1.printStackTrace();
throw new RuntimeException("Failed to read config file");
}
-
+
return ret;
}
-
+
public static Map loadConf(String arg) {
Map ret = null;
if (arg.endsWith("yaml")) {
@@ -866,13 +812,11 @@ public class Utils {
String ret = "";
InputStream input = null;
try {
- input =
- Utils.class.getClassLoader().getResourceAsStream("version");
+ input = Utils.class.getClassLoader().getResourceAsStream("version");
BufferedReader in = new BufferedReader(new InputStreamReader(input));
- String s = in.readLine();
- ret = s.trim();
-
-
+ String s = in.readLine();
+ ret = s.trim();
+
} catch (Exception e) {
LOG.warn("Failed to get version", e);
} finally {
@@ -892,7 +836,7 @@ public class Utils {
bytes[offset++] = (byte) (value & 0x000000FF);
bytes[offset++] = (byte) ((value & 0x0000FF00) >> 8);
bytes[offset++] = (byte) ((value & 0x00FF0000) >> 16);
- bytes[offset] = (byte) ((value & 0xFF000000) >> 24);
+ bytes[offset] = (byte) ((value & 0xFF000000) >> 24);
}
public static int readIntFromByteArray(byte[] bytes, int offset) {
@@ -900,7 +844,7 @@ public class Utils {
ret = ret | (bytes[offset++] & 0x000000FF);
ret = ret | ((bytes[offset++] << 8) & 0x0000FF00);
ret = ret | ((bytes[offset++] << 16) & 0x00FF0000);
- ret = ret | ((bytes[offset] << 24) & 0xFF000000);
+ ret = ret | ((bytes[offset] << 24) & 0xFF000000);
return ret;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/VersionInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/VersionInfo.java b/jstorm-core/src/main/java/backtype/storm/utils/VersionInfo.java
index 1740e18..456dfd0 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/VersionInfo.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/VersionInfo.java
@@ -24,108 +24,102 @@ import java.util.Properties;
public class VersionInfo {
- private Properties info;
-
- protected VersionInfo(String component) {
- info = new Properties();
- String versionInfoFile = component + "-version-info.properties";
- InputStream is = null;
- try {
- is = Thread.currentThread().getContextClassLoader()
- .getResourceAsStream(versionInfoFile);
- if (is == null) {
- throw new IOException("Resource not found");
- }
- info.load(is);
- } catch (IOException ex) {
- } finally {
- if (is != null) {
- try {
-
- is.close();
- } catch (IOException ioex) {
- }
-
- }
- }
- }
-
- protected String _getVersion() {
- return info.getProperty("version", "Unknown");
- }
-
- protected String _getRevision() {
- return info.getProperty("revision", "Unknown");
- }
-
- protected String _getBranch() {
- return info.getProperty("branch", "Unknown");
- }
-
- protected String _getDate() {
- return info.getProperty("date", "Unknown");
- }
-
- protected String _getUser() {
- return info.getProperty("user", "Unknown");
- }
-
- protected String _getUrl() {
- return info.getProperty("url", "Unknown");
- }
-
- protected String _getSrcChecksum() {
- return info.getProperty("srcChecksum", "Unknown");
- }
-
- protected String _getBuildVersion(){
- return getVersion() +
- " from " + _getRevision() +
- " by " + _getUser() +
- " source checksum " + _getSrcChecksum();
- }
-
-
- private static VersionInfo COMMON_VERSION_INFO = new VersionInfo("storm-core");
-
- public static String getVersion() {
- return COMMON_VERSION_INFO._getVersion();
- }
-
- public static String getRevision() {
- return COMMON_VERSION_INFO._getRevision();
- }
-
- public static String getBranch() {
- return COMMON_VERSION_INFO._getBranch();
- }
-
- public static String getDate() {
- return COMMON_VERSION_INFO._getDate();
- }
-
- public static String getUser() {
- return COMMON_VERSION_INFO._getUser();
- }
-
- public static String getUrl() {
- return COMMON_VERSION_INFO._getUrl();
- }
-
- public static String getSrcChecksum() {
- return COMMON_VERSION_INFO._getSrcChecksum();
- }
-
- public static String getBuildVersion(){
- return COMMON_VERSION_INFO._getBuildVersion();
- }
-
-
- public static void main(String[] args) {
- System.out.println("Storm " + getVersion());
- System.out.println("URL " + getUrl() + " -r " + getRevision());
- System.out.println("Branch " + getBranch());
- System.out.println("Compiled by " + getUser() + " on " + getDate());
- System.out.println("From source with checksum " + getSrcChecksum());
- }
+ private Properties info;
+
+ protected VersionInfo(String component) {
+ info = new Properties();
+ String versionInfoFile = component + "-version-info.properties";
+ InputStream is = null;
+ try {
+ is = Thread.currentThread().getContextClassLoader().getResourceAsStream(versionInfoFile);
+ if (is == null) {
+ throw new IOException("Resource not found");
+ }
+ info.load(is);
+ } catch (IOException ex) {
+ } finally {
+ if (is != null) {
+ try {
+
+ is.close();
+ } catch (IOException ioex) {
+ }
+
+ }
+ }
+ }
+
+ protected String _getVersion() {
+ return info.getProperty("version", "Unknown");
+ }
+
+ protected String _getRevision() {
+ return info.getProperty("revision", "Unknown");
+ }
+
+ protected String _getBranch() {
+ return info.getProperty("branch", "Unknown");
+ }
+
+ protected String _getDate() {
+ return info.getProperty("date", "Unknown");
+ }
+
+ protected String _getUser() {
+ return info.getProperty("user", "Unknown");
+ }
+
+ protected String _getUrl() {
+ return info.getProperty("url", "Unknown");
+ }
+
+ protected String _getSrcChecksum() {
+ return info.getProperty("srcChecksum", "Unknown");
+ }
+
+ protected String _getBuildVersion() {
+ return getVersion() + " from " + _getRevision() + " by " + _getUser() + " source checksum " + _getSrcChecksum();
+ }
+
+ private static VersionInfo COMMON_VERSION_INFO = new VersionInfo("storm-core");
+
+ public static String getVersion() {
+ return COMMON_VERSION_INFO._getVersion();
+ }
+
+ public static String getRevision() {
+ return COMMON_VERSION_INFO._getRevision();
+ }
+
+ public static String getBranch() {
+ return COMMON_VERSION_INFO._getBranch();
+ }
+
+ public static String getDate() {
+ return COMMON_VERSION_INFO._getDate();
+ }
+
+ public static String getUser() {
+ return COMMON_VERSION_INFO._getUser();
+ }
+
+ public static String getUrl() {
+ return COMMON_VERSION_INFO._getUrl();
+ }
+
+ public static String getSrcChecksum() {
+ return COMMON_VERSION_INFO._getSrcChecksum();
+ }
+
+ public static String getBuildVersion() {
+ return COMMON_VERSION_INFO._getBuildVersion();
+ }
+
+ public static void main(String[] args) {
+ System.out.println("Storm " + getVersion());
+ System.out.println("URL " + getUrl() + " -r " + getRevision());
+ System.out.println("Branch " + getBranch());
+ System.out.println("Compiled by " + getUser() + " on " + getDate());
+ System.out.println("From source with checksum " + getSrcChecksum());
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/VersionedStore.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/VersionedStore.java b/jstorm-core/src/main/java/backtype/storm/utils/VersionedStore.java
index 07ce5a8..0852292 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/VersionedStore.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/VersionedStore.java
@@ -30,10 +30,10 @@ public class VersionedStore {
private static final String FINISHED_VERSION_SUFFIX = ".version";
private String _root;
-
+
public VersionedStore(String path) throws IOException {
- _root = path;
- mkdirs(_root);
+ _root = path;
+ mkdirs(_root);
}
public String getRoot() {
@@ -46,26 +46,30 @@ public class VersionedStore {
public String mostRecentVersionPath() throws IOException {
Long v = mostRecentVersion();
- if(v==null) return null;
+ if (v == null)
+ return null;
return versionPath(v);
}
public String mostRecentVersionPath(long maxVersion) throws IOException {
Long v = mostRecentVersion(maxVersion);
- if(v==null) return null;
+ if (v == null)
+ return null;
return versionPath(v);
}
public Long mostRecentVersion() throws IOException {
List<Long> all = getAllVersions();
- if(all.size()==0) return null;
+ if (all.size() == 0)
+ return null;
return all.get(0);
}
public Long mostRecentVersion(long maxVersion) throws IOException {
List<Long> all = getAllVersions();
- for(Long v: all) {
- if(v <= maxVersion) return v;
+ for (Long v : all) {
+ if (v <= maxVersion)
+ return v;
}
return null;
}
@@ -73,7 +77,7 @@ public class VersionedStore {
public String createVersion() throws IOException {
Long mostRecent = mostRecentVersion();
long version = Time.currentTimeMillis();
- if(mostRecent!=null && version <= mostRecent) {
+ if (mostRecent != null && version <= mostRecent) {
version = mostRecent + 1;
}
return createVersion(version);
@@ -81,7 +85,7 @@ public class VersionedStore {
public String createVersion(long version) throws IOException {
String ret = versionPath(version);
- if(getAllVersions().contains(version))
+ if (getAllVersions().contains(version))
throw new RuntimeException("Version already exists or data already exists");
else
return ret;
@@ -95,11 +99,11 @@ public class VersionedStore {
File versionFile = new File(versionPath(version));
File tokenFile = new File(tokenPath(version));
- if(tokenFile.exists()) {
+ if (tokenFile.exists()) {
FileUtils.forceDelete(tokenFile);
}
- if(versionFile.exists()) {
+ if (versionFile.exists()) {
FileUtils.forceDelete(versionFile);
}
}
@@ -116,14 +120,14 @@ public class VersionedStore {
public void cleanup(int versionsToKeep) throws IOException {
List<Long> versions = getAllVersions();
- if(versionsToKeep >= 0) {
+ if (versionsToKeep >= 0) {
versions = versions.subList(0, Math.min(versions.size(), versionsToKeep));
}
HashSet<Long> keepers = new HashSet<Long>(versions);
- for(String p: listDir(_root)) {
+ for (String p : listDir(_root)) {
Long v = parseVersion(p);
- if(v!=null && !keepers.contains(v)) {
+ if (v != null && !keepers.contains(v)) {
deleteVersion(v);
}
}
@@ -134,8 +138,8 @@ public class VersionedStore {
*/
public List<Long> getAllVersions() throws IOException {
List<Long> ret = new ArrayList<Long>();
- for(String s: listDir(_root)) {
- if(s.endsWith(FINISHED_VERSION_SUFFIX)) {
+ for (String s : listDir(_root)) {
+ if (s.endsWith(FINISHED_VERSION_SUFFIX)) {
ret.add(validateAndGetVersion(s));
}
}
@@ -150,18 +154,19 @@ public class VersionedStore {
private long validateAndGetVersion(String path) {
Long v = parseVersion(path);
- if(v==null) throw new RuntimeException(path + " is not a valid version");
+ if (v == null)
+ throw new RuntimeException(path + " is not a valid version");
return v;
}
private Long parseVersion(String path) {
String name = new File(path).getName();
- if(name.endsWith(FINISHED_VERSION_SUFFIX)) {
- name = name.substring(0, name.length()-FINISHED_VERSION_SUFFIX.length());
+ if (name.endsWith(FINISHED_VERSION_SUFFIX)) {
+ name = name.substring(0, name.length() - FINISHED_VERSION_SUFFIX.length());
}
try {
return Long.parseLong(name);
- } catch(NumberFormatException e) {
+ } catch (NumberFormatException e) {
return null;
}
}
@@ -173,12 +178,12 @@ public class VersionedStore {
private void mkdirs(String path) throws IOException {
new File(path).mkdirs();
}
-
+
private List<String> listDir(String dir) throws IOException {
List<String> ret = new ArrayList<String>();
File[] contents = new File(dir).listFiles();
- if(contents!=null) {
- for(File f: contents) {
+ if (contents != null) {
+ for (File f : contents) {
ret.add(f.getAbsolutePath());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/WindowedTimeThrottler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/WindowedTimeThrottler.java b/jstorm-core/src/main/java/backtype/storm/utils/WindowedTimeThrottler.java
index 5a288a0..6290f5a 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/WindowedTimeThrottler.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/WindowedTimeThrottler.java
@@ -22,28 +22,28 @@ public class WindowedTimeThrottler {
int _maxAmt;
long _windowStartTime;
int _windowEvents = 0;
-
+
public WindowedTimeThrottler(Number windowMillis, Number maxAmt) {
_windowMillis = windowMillis.longValue();
_maxAmt = maxAmt.intValue();
_windowStartTime = System.currentTimeMillis();
}
-
+
public boolean isThrottled() {
resetIfNecessary();
return _windowEvents >= _maxAmt;
}
-
- //returns void if the event should continue, false if the event should not be done
+
+ // returns void if the event should continue, false if the event should not be done
public void markEvent() {
resetIfNecessary();
_windowEvents++;
-
+
}
-
+
private void resetIfNecessary() {
long now = System.currentTimeMillis();
- if(now - _windowStartTime > _windowMillis) {
+ if (now - _windowStartTime > _windowMillis) {
_windowStartTime = now;
_windowEvents = 0;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/WorkerClassLoader.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/WorkerClassLoader.java b/jstorm-core/src/main/java/backtype/storm/utils/WorkerClassLoader.java
index f3526b1..4c2f35c 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/WorkerClassLoader.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/WorkerClassLoader.java
@@ -28,30 +28,30 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class WorkerClassLoader extends URLClassLoader {
-
+
public static Logger LOG = LoggerFactory.getLogger(WorkerClassLoader.class);
-
+
private ClassLoader defaultClassLoader;
-
+
private ClassLoader JDKClassLoader;
-
+
private boolean isDebug;
-
+
protected static WorkerClassLoader instance;
-
+
protected static boolean enable;
-
+
protected static Map<Thread, ClassLoader> threadContextCache;
-
+
protected WorkerClassLoader(URL[] urls, ClassLoader defaultClassLoader, ClassLoader JDKClassLoader, boolean isDebug) {
super(urls, JDKClassLoader);
this.defaultClassLoader = defaultClassLoader;
this.JDKClassLoader = JDKClassLoader;
this.isDebug = isDebug;
-
+
// TODO Auto-generated constructor stub
}
-
+
// for all log go through logback when enable classloader
protected boolean isLogByDefault(String name) {
if (name.startsWith("org.apache.log4j")) {
@@ -59,11 +59,11 @@ public class WorkerClassLoader extends URLClassLoader {
} else if (name.startsWith("org.slf4j")) {
return true;
}
-
+
return false;
-
+
}
-
+
protected boolean isLoadByDefault(String name) {
if (name.startsWith("backtype.storm") == true) {
return true;
@@ -75,100 +75,101 @@ public class WorkerClassLoader extends URLClassLoader {
return false;
}
}
-
+
@Override
public Class<?> loadClass(String name) throws ClassNotFoundException {
Class<?> result = null;
try {
result = this.findLoadedClass(name);
-
+
if (result != null) {
return result;
}
-
+
try {
result = JDKClassLoader.loadClass(name);
if (result != null)
return result;
} catch (Exception e) {
-
+
}
-
+
try {
if (isLoadByDefault(name) == false) {
result = findClass(name);
-
+
if (result != null) {
return result;
}
}
-
+
} catch (Exception e) {
-
+
}
-
+
result = defaultClassLoader.loadClass(name);
return result;
-
+
} finally {
if (result != null) {
ClassLoader resultClassLoader = result.getClassLoader();
- LOG.info("Successfully load class " + name + " by " + resultClassLoader + ",threadContextLoader:" + Thread.currentThread().getContextClassLoader());
+ LOG.info("Successfully load class " + name + " by " + resultClassLoader + ",threadContextLoader:"
+ + Thread.currentThread().getContextClassLoader());
} else {
LOG.warn("Failed to load class " + name + ",threadContextLoader:" + Thread.currentThread().getContextClassLoader());
}
-
+
if (isDebug) {
LOG.info(Utils.printStack());
}
}
-
+
}
-
+
public static WorkerClassLoader mkInstance(URL[] urls, ClassLoader DefaultClassLoader, ClassLoader JDKClassLoader, boolean enable, boolean isDebug) {
WorkerClassLoader.enable = enable;
if (enable == false) {
LOG.info("Don't enable UserDefine ClassLoader");
return null;
}
-
+
synchronized (WorkerClassLoader.class) {
if (instance == null) {
instance = new WorkerClassLoader(urls, DefaultClassLoader, JDKClassLoader, isDebug);
-
+
threadContextCache = new ConcurrentHashMap<Thread, ClassLoader>();
}
-
+
}
-
+
LOG.info("Successfully create classloader " + mk_list(urls));
return instance;
}
-
+
public static WorkerClassLoader getInstance() {
return instance;
}
-
+
public static boolean isEnable() {
return enable;
}
-
+
public static void switchThreadContext() {
if (enable == false) {
return;
}
-
+
Thread thread = Thread.currentThread();
ClassLoader oldClassLoader = thread.getContextClassLoader();
threadContextCache.put(thread, oldClassLoader);
thread.setContextClassLoader(instance);
}
-
+
public static void restoreThreadContext() {
if (enable == false) {
return;
}
-
+
Thread thread = Thread.currentThread();
ClassLoader oldClassLoader = threadContextCache.get(thread);
if (oldClassLoader != null) {
@@ -177,7 +178,7 @@ public class WorkerClassLoader extends URLClassLoader {
LOG.info("No context classloader of " + thread.getName());
}
}
-
+
private static <V> List<V> mk_list(V... args) {
ArrayList<V> rtn = new ArrayList<V>();
for (V o : args) {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/WritableUtils.java b/jstorm-core/src/main/java/backtype/storm/utils/WritableUtils.java
index 8516f97..2c0a2a3 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/WritableUtils.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/WritableUtils.java
@@ -42,334 +42,314 @@ package backtype.storm.utils;
import java.io.*;
-
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
-public final class WritableUtils {
-
- public static byte[] readCompressedByteArray(DataInput in) throws IOException {
- int length = in.readInt();
- if (length == -1) return null;
- byte[] buffer = new byte[length];
- in.readFully(buffer); // could/should use readFully(buffer,0,length)?
- GZIPInputStream gzi = new GZIPInputStream(new ByteArrayInputStream(buffer, 0, buffer.length));
- byte[] outbuf = new byte[length];
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- int len;
- while((len=gzi.read(outbuf, 0, outbuf.length)) != -1){
- bos.write(outbuf, 0, len);
- }
- byte[] decompressed = bos.toByteArray();
- bos.close();
- gzi.close();
- return decompressed;
- }
-
- public static void skipCompressedByteArray(DataInput in) throws IOException {
- int length = in.readInt();
- if (length != -1) {
- skipFully(in, length);
- }
- }
-
- public static int writeCompressedByteArray(DataOutput out, byte[] bytes) throws IOException {
- if (bytes != null) {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- GZIPOutputStream gzout = new GZIPOutputStream(bos);
- gzout.write(bytes, 0, bytes.length);
- gzout.close();
- byte[] buffer = bos.toByteArray();
- int len = buffer.length;
- out.writeInt(len);
- out.write(buffer, 0, len);
- /* debug only! Once we have confidence, can lose this. */
- return ((bytes.length != 0) ? (100*buffer.length)/bytes.length : 0);
- } else {
- out.writeInt(-1);
- return -1;
+public final class WritableUtils {
+
+ public static byte[] readCompressedByteArray(DataInput in) throws IOException {
+ int length = in.readInt();
+ if (length == -1)
+ return null;
+ byte[] buffer = new byte[length];
+ in.readFully(buffer); // could/should use readFully(buffer,0,length)?
+ GZIPInputStream gzi = new GZIPInputStream(new ByteArrayInputStream(buffer, 0, buffer.length));
+ byte[] outbuf = new byte[length];
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ int len;
+ while ((len = gzi.read(outbuf, 0, outbuf.length)) != -1) {
+ bos.write(outbuf, 0, len);
+ }
+ byte[] decompressed = bos.toByteArray();
+ bos.close();
+ gzi.close();
+ return decompressed;
}
- }
-
-
- /* Ugly utility, maybe someone else can do this better */
- public static String readCompressedString(DataInput in) throws IOException {
- byte[] bytes = readCompressedByteArray(in);
- if (bytes == null) return null;
- return new String(bytes, "UTF-8");
- }
-
-
- public static int writeCompressedString(DataOutput out, String s) throws IOException {
- return writeCompressedByteArray(out, (s != null) ? s.getBytes("UTF-8") : null);
- }
-
- /*
- *
- * Write a String as a Network Int n, followed by n Bytes
- * Alternative to 16 bit read/writeUTF.
- * Encoding standard is... ?
- *
- */
- public static void writeString(DataOutput out, String s) throws IOException {
- if (s != null) {
- byte[] buffer = s.getBytes("UTF-8");
- int len = buffer.length;
- out.writeInt(len);
- out.write(buffer, 0, len);
- } else {
- out.writeInt(-1);
- }
- }
-
- /*
- * Read a String as a Network Int n, followed by n Bytes
- * Alternative to 16 bit read/writeUTF.
- * Encoding standard is... ?
- *
- */
- public static String readString(DataInput in) throws IOException{
- int length = in.readInt();
- if (length == -1) return null;
- byte[] buffer = new byte[length];
- in.readFully(buffer); // could/should use readFully(buffer,0,length)?
- return new String(buffer,"UTF-8");
- }
-
-
- /*
- * Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
- * Could be generalised using introspection.
- *
- */
- public static void writeStringArray(DataOutput out, String[] s) throws IOException{
- out.writeInt(s.length);
- for(int i = 0; i < s.length; i++) {
- writeString(out, s[i]);
+
+ public static void skipCompressedByteArray(DataInput in) throws IOException {
+ int length = in.readInt();
+ if (length != -1) {
+ skipFully(in, length);
+ }
}
- }
-
- /*
- * Write a String array as a Nework Int N, followed by Int N Byte Array of
- * compressed Strings. Handles also null arrays and null values.
- * Could be generalised using introspection.
- *
- */
- public static void writeCompressedStringArray(DataOutput out, String[] s) throws IOException{
- if (s == null) {
- out.writeInt(-1);
- return;
+
+ public static int writeCompressedByteArray(DataOutput out, byte[] bytes) throws IOException {
+ if (bytes != null) {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ GZIPOutputStream gzout = new GZIPOutputStream(bos);
+ gzout.write(bytes, 0, bytes.length);
+ gzout.close();
+ byte[] buffer = bos.toByteArray();
+ int len = buffer.length;
+ out.writeInt(len);
+ out.write(buffer, 0, len);
+ /* debug only! Once we have confidence, can lose this. */
+ return ((bytes.length != 0) ? (100 * buffer.length) / bytes.length : 0);
+ } else {
+ out.writeInt(-1);
+ return -1;
+ }
}
- out.writeInt(s.length);
- for(int i = 0; i < s.length; i++) {
- writeCompressedString(out, s[i]);
+
+ /* Ugly utility, maybe someone else can do this better */
+ public static String readCompressedString(DataInput in) throws IOException {
+ byte[] bytes = readCompressedByteArray(in);
+ if (bytes == null)
+ return null;
+ return new String(bytes, "UTF-8");
}
- }
-
- /*
- * Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
- * Could be generalised using introspection. Actually this bit couldn't...
- *
- */
- public static String[] readStringArray(DataInput in) throws IOException {
- int len = in.readInt();
- if (len == -1) return null;
- String[] s = new String[len];
- for(int i = 0; i < len; i++) {
- s[i] = readString(in);
+
+ public static int writeCompressedString(DataOutput out, String s) throws IOException {
+ return writeCompressedByteArray(out, (s != null) ? s.getBytes("UTF-8") : null);
}
- return s;
- }
-
-
- /*
- * Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
- * Could be generalised using introspection. Handles null arrays and null values.
- *
- */
- public static String[] readCompressedStringArray(DataInput in) throws IOException {
- int len = in.readInt();
- if (len == -1) return null;
- String[] s = new String[len];
- for(int i = 0; i < len; i++) {
- s[i] = readCompressedString(in);
+
+ /*
+ *
+ * Write a String as a Network Int n, followed by n Bytes Alternative to 16 bit read/writeUTF. Encoding standard is... ?
+ */
+ public static void writeString(DataOutput out, String s) throws IOException {
+ if (s != null) {
+ byte[] buffer = s.getBytes("UTF-8");
+ int len = buffer.length;
+ out.writeInt(len);
+ out.write(buffer, 0, len);
+ } else {
+ out.writeInt(-1);
+ }
}
- return s;
- }
-
-
- /*
- *
- * Test Utility Method Display Byte Array.
- *
- */
- public static void displayByteArray(byte[] record){
- int i;
- for(i=0;i < record.length -1; i++){
- if (i % 16 == 0) { System.out.println(); }
- System.out.print(Integer.toHexString(record[i] >> 4 & 0x0F));
- System.out.print(Integer.toHexString(record[i] & 0x0F));
- System.out.print(",");
+
+ /*
+ * Read a String as a Network Int n, followed by n Bytes Alternative to 16 bit read/writeUTF. Encoding standard is... ?
+ */
+ public static String readString(DataInput in) throws IOException {
+ int length = in.readInt();
+ if (length == -1)
+ return null;
+ byte[] buffer = new byte[length];
+ in.readFully(buffer); // could/should use readFully(buffer,0,length)?
+ return new String(buffer, "UTF-8");
}
- System.out.print(Integer.toHexString(record[i] >> 4 & 0x0F));
- System.out.print(Integer.toHexString(record[i] & 0x0F));
- System.out.println();
- }
-
-
- /**
- * Serializes an integer to a binary stream with zero-compressed encoding.
- * For -120 <= i <= 127, only one byte is used with the actual value.
- * For other values of i, the first byte value indicates whether the
- * integer is positive or negative, and the number of bytes that follow.
- * If the first byte value v is between -121 and -124, the following integer
- * is positive, with number of bytes that follow are -(v+120).
- * If the first byte value v is between -125 and -128, the following integer
- * is negative, with number of bytes that follow are -(v+124). Bytes are
- * stored in the high-non-zero-byte-first order.
- *
- * @param stream Binary output stream
- * @param i Integer to be serialized
- * @throws java.io.IOException
- */
- public static void writeVInt(DataOutput stream, int i) throws IOException {
- writeVLong(stream, i);
- }
-
- /**
- * Serializes a long to a binary stream with zero-compressed encoding.
- * For -112 <= i <= 127, only one byte is used with the actual value.
- * For other values of i, the first byte value indicates whether the
- * long is positive or negative, and the number of bytes that follow.
- * If the first byte value v is between -113 and -120, the following long
- * is positive, with number of bytes that follow are -(v+112).
- * If the first byte value v is between -121 and -128, the following long
- * is negative, with number of bytes that follow are -(v+120). Bytes are
- * stored in the high-non-zero-byte-first order.
- *
- * @param stream Binary output stream
- * @param i Long to be serialized
- * @throws java.io.IOException
- */
- public static void writeVLong(DataOutput stream, long i) throws IOException {
- if (i >= -112 && i <= 127) {
- stream.writeByte((byte)i);
- return;
+
+ /*
+ * Write a String array as a Nework Int N, followed by Int N Byte Array Strings. Could be generalised using introspection.
+ */
+ public static void writeStringArray(DataOutput out, String[] s) throws IOException {
+ out.writeInt(s.length);
+ for (int i = 0; i < s.length; i++) {
+ writeString(out, s[i]);
+ }
}
- int len = -112;
- if (i < 0) {
- i ^= -1L; // take one's complement'
- len = -120;
+ /*
+ * Write a String array as a Nework Int N, followed by Int N Byte Array of compressed Strings. Handles also null arrays and null values. Could be
+ * generalised using introspection.
+ */
+ public static void writeCompressedStringArray(DataOutput out, String[] s) throws IOException {
+ if (s == null) {
+ out.writeInt(-1);
+ return;
+ }
+ out.writeInt(s.length);
+ for (int i = 0; i < s.length; i++) {
+ writeCompressedString(out, s[i]);
+ }
}
- long tmp = i;
- while (tmp != 0) {
- tmp = tmp >> 8;
- len--;
+ /*
+ * Write a String array as a Nework Int N, followed by Int N Byte Array Strings. Could be generalised using introspection. Actually this bit couldn't...
+ */
+ public static String[] readStringArray(DataInput in) throws IOException {
+ int len = in.readInt();
+ if (len == -1)
+ return null;
+ String[] s = new String[len];
+ for (int i = 0; i < len; i++) {
+ s[i] = readString(in);
+ }
+ return s;
}
- stream.writeByte((byte)len);
+ /*
+ * Write a String array as a Nework Int N, followed by Int N Byte Array Strings. Could be generalised using introspection. Handles null arrays and null
+ * values.
+ */
+ public static String[] readCompressedStringArray(DataInput in) throws IOException {
+ int len = in.readInt();
+ if (len == -1)
+ return null;
+ String[] s = new String[len];
+ for (int i = 0; i < len; i++) {
+ s[i] = readCompressedString(in);
+ }
+ return s;
+ }
- len = (len < -120) ? -(len + 120) : -(len + 112);
+ /*
+ *
+ * Test Utility Method Display Byte Array.
+ */
+ public static void displayByteArray(byte[] record) {
+ int i;
+ for (i = 0; i < record.length - 1; i++) {
+ if (i % 16 == 0) {
+ System.out.println();
+ }
+ System.out.print(Integer.toHexString(record[i] >> 4 & 0x0F));
+ System.out.print(Integer.toHexString(record[i] & 0x0F));
+ System.out.print(",");
+ }
+ System.out.print(Integer.toHexString(record[i] >> 4 & 0x0F));
+ System.out.print(Integer.toHexString(record[i] & 0x0F));
+ System.out.println();
+ }
- for (int idx = len; idx != 0; idx--) {
- int shiftbits = (idx - 1) * 8;
- long mask = 0xFFL << shiftbits;
- stream.writeByte((byte)((i & mask) >> shiftbits));
+ /**
+ * Serializes an integer to a binary stream with zero-compressed encoding. For -120 <= i <= 127, only one byte is used with the actual value. For other
+ * values of i, the first byte value indicates whether the integer is positive or negative, and the number of bytes that follow. If the first byte value v
+ * is between -121 and -124, the following integer is positive, with number of bytes that follow are -(v+120). If the first byte value v is between -125 and
+ * -128, the following integer is negative, with number of bytes that follow are -(v+124). Bytes are stored in the high-non-zero-byte-first order.
+ *
+ * @param stream Binary output stream
+ * @param i Integer to be serialized
+ * @throws IOException
+ */
+ public static void writeVInt(DataOutput stream, int i) throws IOException {
+ writeVLong(stream, i);
}
- }
-
-
- /**
- * Reads a zero-compressed encoded long from input stream and returns it.
- * @param stream Binary input stream
- * @throws java.io.IOException
- * @return deserialized long from stream.
- */
- public static long readVLong(DataInput stream) throws IOException {
- byte firstByte = stream.readByte();
- int len = decodeVIntSize(firstByte);
- if (len == 1) {
- return firstByte;
+
+ /**
+ * Serializes a long to a binary stream with zero-compressed encoding. For -112 <= i <= 127, only one byte is used with the actual value. For other values
+ * of i, the first byte value indicates whether the long is positive or negative, and the number of bytes that follow. If the first byte value v is between
+ * -113 and -120, the following long is positive, with number of bytes that follow are -(v+112). If the first byte value v is between -121 and -128, the
+ * following long is negative, with number of bytes that follow are -(v+120). Bytes are stored in the high-non-zero-byte-first order.
+ *
+ * @param stream Binary output stream
+ * @param i Long to be serialized
+ * @throws IOException
+ */
+ public static void writeVLong(DataOutput stream, long i) throws IOException {
+ if (i >= -112 && i <= 127) {
+ stream.writeByte((byte) i);
+ return;
+ }
+
+ int len = -112;
+ if (i < 0) {
+ i ^= -1L; // take one's complement'
+ len = -120;
+ }
+
+ long tmp = i;
+ while (tmp != 0) {
+ tmp = tmp >> 8;
+ len--;
+ }
+
+ stream.writeByte((byte) len);
+
+ len = (len < -120) ? -(len + 120) : -(len + 112);
+
+ for (int idx = len; idx != 0; idx--) {
+ int shiftbits = (idx - 1) * 8;
+ long mask = 0xFFL << shiftbits;
+ stream.writeByte((byte) ((i & mask) >> shiftbits));
+ }
}
- long i = 0;
- for (int idx = 0; idx < len-1; idx++) {
- byte b = stream.readByte();
- i = i << 8;
- i = i | (b & 0xFF);
+
+ /**
+ * Reads a zero-compressed encoded long from input stream and returns it.
+ *
+ * @param stream Binary input stream
+ * @throws IOException
+ * @return deserialized long from stream.
+ */
+ public static long readVLong(DataInput stream) throws IOException {
+ byte firstByte = stream.readByte();
+ int len = decodeVIntSize(firstByte);
+ if (len == 1) {
+ return firstByte;
+ }
+ long i = 0;
+ for (int idx = 0; idx < len - 1; idx++) {
+ byte b = stream.readByte();
+ i = i << 8;
+ i = i | (b & 0xFF);
+ }
+ return (isNegativeVInt(firstByte) ? (i ^ -1L) : i);
}
- return (isNegativeVInt(firstByte) ? (i ^ -1L) : i);
- }
-
- /**
- * Reads a zero-compressed encoded integer from input stream and returns it.
- * @param stream Binary input stream
- * @throws java.io.IOException
- * @return deserialized integer from stream.
- */
- public static int readVInt(DataInput stream) throws IOException {
- return (int) readVLong(stream);
- }
-
- /**
- * Given the first byte of a vint/vlong, determine the sign
- * @param value the first byte
- * @return is the value negative
- */
- public static boolean isNegativeVInt(byte value) {
- return value < -120 || (value >= -112 && value < 0);
- }
-
- /**
- * Parse the first byte of a vint/vlong to determine the number of bytes
- * @param value the first byte of the vint/vlong
- * @return the total number of bytes (1 to 9)
- */
- public static int decodeVIntSize(byte value) {
- if (value >= -112) {
- return 1;
- } else if (value < -120) {
- return -119 - value;
+
+ /**
+ * Reads a zero-compressed encoded integer from input stream and returns it.
+ *
+ * @param stream Binary input stream
+ * @throws IOException
+ * @return deserialized integer from stream.
+ */
+ public static int readVInt(DataInput stream) throws IOException {
+ return (int) readVLong(stream);
}
- return -111 - value;
- }
-
- /**
- * Get the encoded length if an integer is stored in a variable-length format
- * @return the encoded length
- */
- public static int getVIntSize(long i) {
- if (i >= -112 && i <= 127) {
- return 1;
+
+ /**
+ * Given the first byte of a vint/vlong, determine the sign
+ *
+ * @param value the first byte
+ * @return is the value negative
+ */
+ public static boolean isNegativeVInt(byte value) {
+ return value < -120 || (value >= -112 && value < 0);
}
- if (i < 0) {
- i ^= -1L; // take one's complement'
+ /**
+ * Parse the first byte of a vint/vlong to determine the number of bytes
+ *
+ * @param value the first byte of the vint/vlong
+ * @return the total number of bytes (1 to 9)
+ */
+ public static int decodeVIntSize(byte value) {
+ if (value >= -112) {
+ return 1;
+ } else if (value < -120) {
+ return -119 - value;
+ }
+ return -111 - value;
}
- // find the number of bytes with non-leading zeros
- int dataBits = Long.SIZE - Long.numberOfLeadingZeros(i);
- // find the number of data bytes + length byte
- return (dataBits + 7) / 8 + 1;
- }
-
- /**
- * Skip <i>len</i> number of bytes in input stream<i>in</i>
- * @param in input stream
- * @param len number of bytes to skip
- * @throws IOException when skipped less number of bytes
- */
- public static void skipFully(DataInput in, int len) throws IOException {
- int total = 0;
- int cur = 0;
-
- while ((total<len) && ((cur = in.skipBytes(len-total)) > 0)) {
- total += cur;
+
+ /**
+ * Get the encoded length if an integer is stored in a variable-length format
+ *
+ * @return the encoded length
+ */
+ public static int getVIntSize(long i) {
+ if (i >= -112 && i <= 127) {
+ return 1;
+ }
+
+ if (i < 0) {
+ i ^= -1L; // take one's complement'
+ }
+ // find the number of bytes with non-leading zeros
+ int dataBits = Long.SIZE - Long.numberOfLeadingZeros(i);
+ // find the number of data bytes + length byte
+ return (dataBits + 7) / 8 + 1;
}
- if (total<len) {
- throw new IOException("Not able to skip " + len + " bytes, possibly " +
- "due to end of input.");
+ /**
+ * Skip <i>len</i> number of bytes in input stream<i>in</i>
+ *
+ * @param in input stream
+ * @param len number of bytes to skip
+ * @throws IOException when skipped less number of bytes
+ */
+ public static void skipFully(DataInput in, int len) throws IOException {
+ int total = 0;
+ int cur = 0;
+
+ while ((total < len) && ((cur = in.skipBytes(len - total)) > 0)) {
+ total += cur;
+ }
+
+ if (total < len) {
+ throw new IOException("Not able to skip " + len + " bytes, possibly " + "due to end of input.");
+ }
}
- }
}