You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:05:05 UTC
[27/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/CRC32OutputStream.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/CRC32OutputStream.java b/jstorm-core/src/main/java/backtype/storm/utils/CRC32OutputStream.java
index 7d5ce73..483f0df 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/CRC32OutputStream.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/CRC32OutputStream.java
@@ -23,11 +23,11 @@ import java.util.zip.CRC32;
public class CRC32OutputStream extends OutputStream {
private CRC32 hasher;
-
+
public CRC32OutputStream() {
hasher = new CRC32();
}
-
+
public long getValue() {
return hasher.getValue();
}
@@ -40,5 +40,5 @@ public class CRC32OutputStream extends OutputStream {
@Override
public void write(byte[] bytes, int start, int end) throws IOException {
hasher.update(bytes, start, end);
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ClojureTimerTask.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ClojureTimerTask.java b/jstorm-core/src/main/java/backtype/storm/utils/ClojureTimerTask.java
index ca9b010..677cf60 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/ClojureTimerTask.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/ClojureTimerTask.java
@@ -22,14 +22,14 @@ import java.util.TimerTask;
public class ClojureTimerTask extends TimerTask {
IFn _afn;
-
+
public ClojureTimerTask(IFn afn) {
super();
_afn = afn;
}
-
+
@Override
public void run() {
_afn.run();
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/Container.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/Container.java b/jstorm-core/src/main/java/backtype/storm/utils/Container.java
index d4edcdf..0927e7c 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/Container.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/Container.java
@@ -20,5 +20,5 @@ package backtype.storm.utils;
import java.io.Serializable;
public class Container implements Serializable {
- public Object object;
+ public Object object;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/DRPCClient.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/DRPCClient.java b/jstorm-core/src/main/java/backtype/storm/utils/DRPCClient.java
index b2a2a7d..03ede66 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/DRPCClient.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/DRPCClient.java
@@ -46,15 +46,15 @@ public class DRPCClient extends ThriftClient implements DistributedRPC.Iface {
this.port = port;
this.client = new DistributedRPC.Client(_protocol);
}
-
+
public String getHost() {
return host;
}
-
+
public int getPort() {
return port;
}
-
+
public String execute(String func, String args) throws TException, DRPCExecutionException, AuthorizationException {
return client.execute(func, args);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueue.java b/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueue.java
index 94768e6..330a5c6 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueue.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueue.java
@@ -32,13 +32,13 @@ public abstract class DisruptorQueue implements IStatefulObject {
public static void setUseSleep(boolean useSleep) {
DisruptorQueueImpl.setUseSleep(useSleep);
}
-
+
private static boolean CAPACITY_LIMITED = false;
-
+
public static void setLimited(boolean limited) {
CAPACITY_LIMITED = limited;
}
-
+
public static DisruptorQueue mkInstance(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) {
if (CAPACITY_LIMITED == true) {
return new DisruptorQueueImpl(queueName, producerType, bufferSize, wait);
@@ -46,35 +46,35 @@ public abstract class DisruptorQueue implements IStatefulObject {
return new DisruptorWrapBlockingQueue(queueName, producerType, bufferSize, wait);
}
}
-
+
public abstract String getName();
-
+
public abstract void haltWithInterrupt();
-
+
public abstract Object poll();
-
+
public abstract Object take();
-
+
public abstract void consumeBatch(EventHandler<Object> handler);
-
+
public abstract void consumeBatchWhenAvailable(EventHandler<Object> handler);
-
+
public abstract void publish(Object obj);
-
+
public abstract void publish(Object obj, boolean block) throws InsufficientCapacityException;
-
+
public abstract void consumerStarted();
-
+
public abstract void clear();
-
+
public abstract long population();
-
+
public abstract long capacity();
-
+
public abstract long writePos();
-
+
public abstract long readPos();
-
+
public abstract float pctFull();
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueueImpl.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueueImpl.java b/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueueImpl.java
index 58d8313..2941cc9 100644
--- a/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueueImpl.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/DisruptorQueueImpl.java
@@ -45,30 +45,30 @@ import com.lmax.disruptor.dsl.ProducerType;
public class DisruptorQueueImpl extends DisruptorQueue {
private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueueImpl.class);
static boolean useSleep = true;
-
+
public static void setUseSleep(boolean useSleep) {
AbstractSequencerExt.setWaitSleep(useSleep);
}
-
+
private static final Object FLUSH_CACHE = new Object();
private static final Object INTERRUPT = new Object();
private static final String PREFIX = "disruptor-";
-
+
private final String _queueName;
private final RingBuffer<MutableObject> _buffer;
private final Sequence _consumer;
private final SequenceBarrier _barrier;
-
+
// TODO: consider having a threadlocal cache of this variable to speed up
// reads?
volatile boolean consumerStartedFlag = false;
-
+
private final HashMap<String, Object> state = new HashMap<String, Object>(4);
private final ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue<Object>();
private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
private final Lock readLock = cacheLock.readLock();
private final Lock writeLock = cacheLock.writeLock();
-
+
public DisruptorQueueImpl(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) {
this._queueName = PREFIX + queueName;
_buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait);
@@ -89,19 +89,19 @@ public class DisruptorQueueImpl extends DisruptorQueue {
}
}
}
-
+
public String getName() {
return _queueName;
}
-
+
public void consumeBatch(EventHandler<Object> handler) {
consumeBatchToCursor(_barrier.getCursor(), handler);
}
-
+
public void haltWithInterrupt() {
publish(INTERRUPT);
}
-
+
public Object poll() {
// @@@
// should use _cache.isEmpty, but it is slow
@@ -109,7 +109,7 @@ public class DisruptorQueueImpl extends DisruptorQueue {
if (consumerStartedFlag == false) {
return _cache.poll();
}
-
+
final long nextSequence = _consumer.get() + 1;
if (nextSequence <= _barrier.getCursor()) {
MutableObject mo = _buffer.get(nextSequence);
@@ -120,7 +120,7 @@ public class DisruptorQueueImpl extends DisruptorQueue {
}
return null;
}
-
+
public Object take() {
// @@@
// should use _cache.isEmpty, but it is slow
@@ -128,7 +128,7 @@ public class DisruptorQueueImpl extends DisruptorQueue {
if (consumerStartedFlag == false) {
return _cache.poll();
}
-
+
final long nextSequence = _consumer.get() + 1;
// final long availableSequence;
try {
@@ -141,7 +141,7 @@ public class DisruptorQueueImpl extends DisruptorQueue {
// throw new RuntimeException(e);
return null;
} catch (TimeoutException e) {
- //LOG.error(e.getCause(), e);
+ // LOG.error(e.getCause(), e);
return null;
}
MutableObject mo = _buffer.get(nextSequence);
@@ -150,7 +150,7 @@ public class DisruptorQueueImpl extends DisruptorQueue {
mo.setObject(null);
return ret;
}
-
+
public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
try {
final long nextSequence = _consumer.get() + 1;
@@ -165,11 +165,11 @@ public class DisruptorQueueImpl extends DisruptorQueue {
LOG.error("InterruptedException " + e.getCause());
return;
} catch (TimeoutException e) {
- //LOG.error(e.getCause(), e);
+ // LOG.error(e.getCause(), e);
return;
}
}
-
+
public void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
for (long curr = _consumer.get() + 1; curr <= cursor; curr++) {
try {
@@ -202,7 +202,7 @@ public class DisruptorQueueImpl extends DisruptorQueue {
// TODO: only set this if the consumer cursor has changed?
_consumer.set(cursor);
}
-
+
/*
* Caches until consumerStarted is called, upon which the cache is flushed to the consumer
*/
@@ -213,15 +213,15 @@ public class DisruptorQueueImpl extends DisruptorQueue {
throw new RuntimeException("This code should be unreachable!");
}
}
-
+
public void tryPublish(Object obj) throws InsufficientCapacityException {
publish(obj, false);
}
-
+
public void publish(Object obj, boolean block) throws InsufficientCapacityException {
-
+
boolean publishNow = consumerStartedFlag;
-
+
if (!publishNow) {
readLock.lock();
try {
@@ -233,12 +233,12 @@ public class DisruptorQueueImpl extends DisruptorQueue {
readLock.unlock();
}
}
-
+
if (publishNow) {
publishDirect(obj, block);
}
}
-
+
protected void publishDirect(Object obj, boolean block) throws InsufficientCapacityException {
final long id;
if (block) {
@@ -250,41 +250,41 @@ public class DisruptorQueueImpl extends DisruptorQueue {
m.setObject(obj);
_buffer.publish(id);
}
-
+
public void consumerStarted() {
-
+
writeLock.lock();
consumerStartedFlag = true;
-
+
writeLock.unlock();
}
-
+
public void clear() {
while (population() != 0L) {
poll();
}
}
-
+
public long population() {
return (writePos() - readPos());
}
-
+
public long capacity() {
return _buffer.getBufferSize();
}
-
+
public long writePos() {
return _buffer.getCursor();
}
-
+
public long readPos() {
return _consumer.get();
}
-
+
public float pctFull() {
return (1.0F * population() / capacity());
}
-
+
@Override
public Object getState() {
// get readPos then writePos so it's never an under-estimate
@@ -296,7 +296,7 @@ public class DisruptorQueueImpl extends DisruptorQueue {
state.put("read_pos", rp);
return state;
}
-
+
public static class ObjectEventFactory implements EventFactory<MutableObject> {
@Override
public MutableObject newInstance() {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/DisruptorWrapBlockingQueue.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/DisruptorWrapBlockingQueue.java b/jstorm-core/src/main/java/backtype/storm/utils/DisruptorWrapBlockingQueue.java
index af5618b..5831a97 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/DisruptorWrapBlockingQueue.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/DisruptorWrapBlockingQueue.java
@@ -36,33 +36,33 @@ import com.lmax.disruptor.dsl.ProducerType;
*/
public class DisruptorWrapBlockingQueue extends DisruptorQueue {
private static final Logger LOG = LoggerFactory.getLogger(DisruptorWrapBlockingQueue.class);
-
+
private static final long QUEUE_CAPACITY = 512;
private LinkedBlockingDeque<Object> queue;
-
+
private String queueName;
-
+
public DisruptorWrapBlockingQueue(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) {
this.queueName = queueName;
queue = new LinkedBlockingDeque<Object>();
}
-
+
public String getName() {
return queueName;
}
-
+
// poll method
public void consumeBatch(EventHandler<Object> handler) {
consumeBatchToCursor(0, handler);
}
-
+
public void haltWithInterrupt() {
}
-
+
public Object poll() {
return queue.poll();
}
-
+
public Object take() {
try {
return queue.take();
@@ -70,7 +70,7 @@ public class DisruptorWrapBlockingQueue extends DisruptorQueue {
return null;
}
}
-
+
public void drainQueue(Object object, EventHandler<Object> handler) {
while (object != null) {
try {
@@ -84,7 +84,7 @@ public class DisruptorWrapBlockingQueue extends DisruptorQueue {
}
}
}
-
+
public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
Object object = queue.poll();
if (object == null) {
@@ -96,16 +96,16 @@ public class DisruptorWrapBlockingQueue extends DisruptorQueue {
throw new RuntimeException(e);
}
}
-
+
drainQueue(object, handler);
-
+
}
-
+
public void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
Object object = queue.poll();
drainQueue(object, handler);
}
-
+
/*
* Caches until consumerStarted is called, upon which the cache is flushed to the consumer
*/
@@ -118,17 +118,17 @@ public class DisruptorWrapBlockingQueue extends DisruptorQueue {
}
isSuccess = queue.offer(obj);
}
-
+
}
-
+
public void tryPublish(Object obj) throws InsufficientCapacityException {
boolean isSuccess = queue.offer(obj);
if (isSuccess == false) {
throw InsufficientCapacityException.INSTANCE;
}
-
+
}
-
+
public void publish(Object obj, boolean block) throws InsufficientCapacityException {
if (block == true) {
publish(obj);
@@ -136,21 +136,21 @@ public class DisruptorWrapBlockingQueue extends DisruptorQueue {
tryPublish(obj);
}
}
-
+
public void consumerStarted() {
}
-
+
private void flushCache() {
}
-
+
public void clear() {
queue.clear();
}
-
+
public long population() {
return queue.size();
}
-
+
public long capacity() {
long used = queue.size();
if (used < QUEUE_CAPACITY) {
@@ -159,15 +159,15 @@ public class DisruptorWrapBlockingQueue extends DisruptorQueue {
return used;
}
}
-
+
public long writePos() {
return 0;
}
-
+
public long readPos() {
return queue.size();
}
-
+
public float pctFull() {
long used = queue.size();
if (used < QUEUE_CAPACITY) {
@@ -176,7 +176,7 @@ public class DisruptorWrapBlockingQueue extends DisruptorQueue {
return 1.0f;
}
}
-
+
@Override
public Object getState() {
Map state = new HashMap<String, Object>();
@@ -189,12 +189,12 @@ public class DisruptorWrapBlockingQueue extends DisruptorQueue {
state.put("read_pos", rp);
return state;
}
-
+
public static class ObjectEventFactory implements EventFactory<MutableObject> {
@Override
public MutableObject newInstance() {
return new MutableObject();
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ExtendedThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ExtendedThreadPoolExecutor.java b/jstorm-core/src/main/java/backtype/storm/utils/ExtendedThreadPoolExecutor.java
index 4614366..e68898e 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/ExtendedThreadPoolExecutor.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/ExtendedThreadPoolExecutor.java
@@ -27,41 +27,44 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-public class ExtendedThreadPoolExecutor extends ThreadPoolExecutor{
+public class ExtendedThreadPoolExecutor extends ThreadPoolExecutor {
- public ExtendedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
- }
-
- public ExtendedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
- }
+ public ExtendedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+ }
- public ExtendedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
- }
+ public ExtendedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+ }
- public ExtendedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
- }
+ public ExtendedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
+ RejectedExecutionHandler handler) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
+ }
- @Override
- protected void afterExecute(Runnable r, Throwable t) {
- super.afterExecute(r, t);
- if (t == null && r instanceof Future<?>) {
- try {
- Object result = ((Future<?>) r).get();
- } catch (CancellationException ce) {
- t = ce;
- } catch (ExecutionException ee) {
- t = ee.getCause();
- } catch (InterruptedException ie) {
- // If future got interrupted exception, we want to interrupt parent thread itself.
- Thread.currentThread().interrupt();
- }
+ public ExtendedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory, RejectedExecutionHandler handler) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
- if (t != null) {
- Utils.handleUncaughtException(t);
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ super.afterExecute(r, t);
+ if (t == null && r instanceof Future<?>) {
+ try {
+ Object result = ((Future<?>) r).get();
+ } catch (CancellationException ce) {
+ t = ce;
+ } catch (ExecutionException ee) {
+ t = ee.getCause();
+ } catch (InterruptedException ie) {
+ // If future got interrupted exception, we want to interrupt parent thread itself.
+ Thread.currentThread().interrupt();
+ }
+ }
+ if (t != null) {
+ Utils.handleUncaughtException(t);
+ }
}
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/IndifferentAccessMap.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/IndifferentAccessMap.java b/jstorm-core/src/main/java/backtype/storm/utils/IndifferentAccessMap.java
index c0190cc..2bc6e7d 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/IndifferentAccessMap.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/IndifferentAccessMap.java
@@ -17,7 +17,6 @@
*/
package backtype.storm.utils;
-
import clojure.lang.ILookup;
import clojure.lang.ISeq;
import clojure.lang.AFn;
@@ -65,16 +64,17 @@ public class IndifferentAccessMap extends AFn implements ILookup, IPersistentMap
@Override
public Object valAt(Object o) {
- if(o instanceof Keyword) {
+ if (o instanceof Keyword) {
return valAt(((Keyword) o).getName());
}
return getMap().valAt(o);
}
-
+
@Override
public Object valAt(Object o, Object def) {
Object ret = valAt(o);
- if(ret==null) ret = def;
+ if (ret == null)
+ ret = def;
return ret;
}
@@ -92,30 +92,35 @@ public class IndifferentAccessMap extends AFn implements ILookup, IPersistentMap
/* IPersistentMap */
/* Naive implementation, but it might be good enough */
public IPersistentMap assoc(Object k, Object v) {
- if(k instanceof Keyword) return assoc(((Keyword) k).getName(), v);
-
+ if (k instanceof Keyword)
+ return assoc(((Keyword) k).getName(), v);
+
return new IndifferentAccessMap(getMap().assoc(k, v));
}
public IPersistentMap assocEx(Object k, Object v) {
- if(k instanceof Keyword) return assocEx(((Keyword) k).getName(), v);
+ if (k instanceof Keyword)
+ return assocEx(((Keyword) k).getName(), v);
return new IndifferentAccessMap(getMap().assocEx(k, v));
}
public IPersistentMap without(Object k) {
- if(k instanceof Keyword) return without(((Keyword) k).getName());
+ if (k instanceof Keyword)
+ return without(((Keyword) k).getName());
return new IndifferentAccessMap(getMap().without(k));
}
public boolean containsKey(Object k) {
- if(k instanceof Keyword) return containsKey(((Keyword) k).getName());
+ if (k instanceof Keyword)
+ return containsKey(((Keyword) k).getName());
return getMap().containsKey(k);
}
public IMapEntry entryAt(Object k) {
- if(k instanceof Keyword) return entryAt(((Keyword) k).getName());
+ if (k instanceof Keyword)
+ return entryAt(((Keyword) k).getName());
return getMap().entryAt(k);
}
@@ -160,17 +165,20 @@ public class IndifferentAccessMap extends AFn implements ILookup, IPersistentMap
public Collection values() {
return ((Map) getMap()).values();
}
-
+
/* Not implemented */
public void clear() {
throw new UnsupportedOperationException();
}
+
public Object put(Object k, Object v) {
throw new UnsupportedOperationException();
}
+
public void putAll(Map m) {
throw new UnsupportedOperationException();
}
+
public Object remove(Object k) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/InprocMessaging.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/InprocMessaging.java b/jstorm-core/src/main/java/backtype/storm/utils/InprocMessaging.java
index b20c775..03c8e4b 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/InprocMessaging.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/InprocMessaging.java
@@ -25,32 +25,32 @@ public class InprocMessaging {
private static Map<Integer, LinkedBlockingQueue<Object>> _queues = new HashMap<Integer, LinkedBlockingQueue<Object>>();
private static final Object _lock = new Object();
private static int port = 1;
-
+
public static int acquireNewPort() {
int ret;
- synchronized(_lock) {
+ synchronized (_lock) {
ret = port;
port++;
}
return ret;
}
-
+
public static void sendMessage(int port, Object msg) {
getQueue(port).add(msg);
}
-
+
public static Object takeMessage(int port) throws InterruptedException {
return getQueue(port).take();
}
public static Object pollMessage(int port) {
- return getQueue(port).poll();
- }
-
+ return getQueue(port).poll();
+ }
+
private static LinkedBlockingQueue<Object> getQueue(int port) {
- synchronized(_lock) {
- if(!_queues.containsKey(port)) {
- _queues.put(port, new LinkedBlockingQueue<Object>());
+ synchronized (_lock) {
+ if (!_queues.containsKey(port)) {
+ _queues.put(port, new LinkedBlockingQueue<Object>());
}
return _queues.get(port);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/KeyedRoundRobinQueue.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/KeyedRoundRobinQueue.java b/jstorm-core/src/main/java/backtype/storm/utils/KeyedRoundRobinQueue.java
index 3cb455d..661c045 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/KeyedRoundRobinQueue.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/KeyedRoundRobinQueue.java
@@ -33,9 +33,9 @@ public class KeyedRoundRobinQueue<V> {
private int _currIndex = 0;
public void add(Object key, V val) {
- synchronized(_lock) {
+ synchronized (_lock) {
Queue<V> queue = _queues.get(key);
- if(queue==null) {
+ if (queue == null) {
queue = new LinkedList<V>();
_queues.put(key, queue);
_keyOrder.add(key);
@@ -47,14 +47,14 @@ public class KeyedRoundRobinQueue<V> {
public V take() throws InterruptedException {
_size.acquire();
- synchronized(_lock) {
+ synchronized (_lock) {
Object key = _keyOrder.get(_currIndex);
Queue<V> queue = _queues.get(key);
V ret = queue.remove();
- if(queue.isEmpty()) {
+ if (queue.isEmpty()) {
_keyOrder.remove(_currIndex);
_queues.remove(key);
- if(_keyOrder.size()==0) {
+ if (_keyOrder.size() == 0) {
_currIndex = 0;
} else {
_currIndex = _currIndex % _keyOrder.size();
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ListDelegate.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ListDelegate.java b/jstorm-core/src/main/java/backtype/storm/utils/ListDelegate.java
index 1e091f0..25e6878 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/ListDelegate.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/ListDelegate.java
@@ -25,11 +25,11 @@ import java.util.ListIterator;
public class ListDelegate implements List<Object> {
private List<Object> _delegate;
-
+
public ListDelegate() {
- _delegate = new ArrayList<Object>();
+ _delegate = new ArrayList<Object>();
}
-
+
public void setDelegate(List<Object> delegate) {
_delegate = delegate;
}
@@ -37,7 +37,7 @@ public class ListDelegate implements List<Object> {
public List<Object> getDelegate() {
return _delegate;
}
-
+
@Override
public int size() {
return _delegate.size();
@@ -152,5 +152,5 @@ public class ListDelegate implements List<Object> {
public List<Object> subList(int i, int i1) {
return _delegate.subList(i, i1);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/LocalState.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/LocalState.java b/jstorm-core/src/main/java/backtype/storm/utils/LocalState.java
index 0d8292f..843efb4 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/LocalState.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/LocalState.java
@@ -25,12 +25,9 @@ import java.util.HashMap;
import java.io.IOException;
/**
- * A simple, durable, atomic K/V database. *Very inefficient*, should only be
- * used for occasional reads/writes. Every read/write hits disk.
+ * A simple, durable, atomic K/V database. *Very inefficient*, should only be used for occasional reads/writes. Every read/write hits disk.
*
- * @@@
- * Right now, This class hasn't upgrade to storm's LocalState
- * It is need define every type in thrift, it is too complicated to do
+ * @@@ Right now, This class hasn't upgrade to storm's LocalState It is need define every type in thrift, it is too complicated to do
*/
public class LocalState {
private VersionedStore _vs;
@@ -46,8 +43,7 @@ public class LocalState {
if (latestPath == null)
return new HashMap<Object, Object>();
try {
- return (Map<Object, Object>) Utils.javaDeserialize(FileUtils
- .readFileToByteArray(new File(latestPath)));
+ return (Map<Object, Object>) Utils.javaDeserialize(FileUtils.readFileToByteArray(new File(latestPath)));
} catch (IOException e) {
attempts++;
if (attempts >= 10) {
@@ -65,8 +61,7 @@ public class LocalState {
put(key, val, true);
}
- public synchronized void put(Object key, Object val, boolean cleanup)
- throws IOException {
+ public synchronized void put(Object key, Object val, boolean cleanup) throws IOException {
Map<Object, Object> curr = snapshot();
curr.put(key, val);
persist(curr, cleanup);
@@ -76,8 +71,7 @@ public class LocalState {
remove(key, true);
}
- public synchronized void remove(Object key, boolean cleanup)
- throws IOException {
+ public synchronized void remove(Object key, boolean cleanup) throws IOException {
Map<Object, Object> curr = snapshot();
curr.remove(key);
persist(curr, cleanup);
@@ -87,8 +81,7 @@ public class LocalState {
_vs.cleanup(keepVersions);
}
- private void persist(Map<Object, Object> val, boolean cleanup)
- throws IOException {
+ private void persist(Map<Object, Object> val, boolean cleanup) throws IOException {
byte[] toWrite = Utils.serialize(val);
String newPath = _vs.createVersion();
FileUtils.writeByteArrayToFile(new File(newPath), toWrite);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/Monitor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/Monitor.java b/jstorm-core/src/main/java/backtype/storm/utils/Monitor.java
index eb57e99..b725084 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/Monitor.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/Monitor.java
@@ -17,16 +17,17 @@
*/
package backtype.storm.utils;
-import backtype.storm.generated.*;
+import backtype.storm.generated.ClusterSummary;
+import backtype.storm.generated.Nimbus;
+import backtype.storm.generated.TopologySummary;
import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
/**
* Deprecated in JStorm
+ *
* @author zhongyan.feng
- *
+ *
*/
@Deprecated
public class Monitor {
@@ -106,17 +107,17 @@ public class Monitor {
/**
* @@@ Don't be compatible with Storm
*
- * Here skip the logic
+ * Here skip the logic
* @param client
* @param topology
* @return
* @throws Exception
*/
- private HashSet<String> getComponents(Nimbus.Client client, String topology) throws Exception{
+ private HashSet<String> getComponents(Nimbus.Client client, String topology) throws Exception {
HashSet<String> components = new HashSet<String>();
ClusterSummary clusterSummary = client.getClusterInfo();
TopologySummary topologySummary = null;
- for (TopologySummary ts: clusterSummary.get_topologies()) {
+ for (TopologySummary ts : clusterSummary.get_topologies()) {
if (topology.equals(ts.get_name())) {
topologySummary = ts;
break;
@@ -126,12 +127,12 @@ public class Monitor {
throw new IllegalArgumentException("topology: " + topology + " not found");
} else {
String id = topologySummary.get_id();
-// GetInfoOptions getInfoOpts = new GetInfoOptions();
-// getInfoOpts.set_num_err_choice(NumErrorsChoice.NONE);
-// TopologyInfo info = client.getTopologyInfoWithOpts(id, getInfoOpts);
-// for (ExecutorSummary es: info.get_executors()) {
-// components.add(es.get_component_id());
-// }
+ // GetInfoOptions getInfoOpts = new GetInfoOptions();
+ // getInfoOpts.set_num_err_choice(NumErrorsChoice.NONE);
+ // TopologyInfo info = client.getTopologyInfoWithOpts(id, getInfoOpts);
+ // for (ExecutorSummary es: info.get_executors()) {
+ // components.add(es.get_component_id());
+ // }
}
return components;
}
@@ -161,7 +162,7 @@ public class Monitor {
throw new IllegalArgumentException("stream name must be something");
}
- if ( !WATCH_TRANSFERRED.equals(_watch) && !WATCH_EMITTED.equals(_watch)) {
+ if (!WATCH_TRANSFERRED.equals(_watch) && !WATCH_EMITTED.equals(_watch)) {
throw new IllegalArgumentException("watch item must either be transferred or emitted");
}
System.out.println("topology\tcomponent\tparallelism\tstream\ttime-diff ms\t" + _watch + "\tthroughput (Kt/s)");
@@ -189,7 +190,7 @@ public class Monitor {
boolean streamFound = false;
ClusterSummary clusterSummary = client.getClusterInfo();
TopologySummary topologySummary = null;
- for (TopologySummary ts: clusterSummary.get_topologies()) {
+ for (TopologySummary ts : clusterSummary.get_topologies()) {
if (_topology.equals(ts.get_name())) {
topologySummary = ts;
break;
@@ -198,30 +199,30 @@ public class Monitor {
if (topologySummary == null) {
throw new IllegalArgumentException("topology: " + _topology + " not found");
} else {
-// String id = topologySummary.get_id();
-// GetInfoOptions getInfoOpts = new GetInfoOptions();
-// getInfoOpts.set_num_err_choice(NumErrorsChoice.NONE);
-// TopologyInfo info = client.getTopologyInfoWithOpts(id, getInfoOpts);
-// for (ExecutorSummary es: info.get_executors()) {
-// if (_component.equals(es.get_component_id())) {
-// componentParallelism ++;
-// ExecutorStats stats = es.get_stats();
-// if (stats != null) {
-// Map<String,Map<String,Long>> statted =
-// WATCH_EMITTED.equals(_watch) ? stats.get_emitted() : stats.get_transferred();
-// if ( statted != null) {
-// Map<String, Long> e2 = statted.get(":all-time");
-// if (e2 != null) {
-// Long stream = e2.get(_stream);
-// if (stream != null){
-// streamFound = true;
-// totalStatted += stream;
-// }
-// }
-// }
-// }
-// }
-// }
+ // String id = topologySummary.get_id();
+ // GetInfoOptions getInfoOpts = new GetInfoOptions();
+ // getInfoOpts.set_num_err_choice(NumErrorsChoice.NONE);
+ // TopologyInfo info = client.getTopologyInfoWithOpts(id, getInfoOpts);
+ // for (ExecutorSummary es: info.get_executors()) {
+ // if (_component.equals(es.get_component_id())) {
+ // componentParallelism ++;
+ // ExecutorStats stats = es.get_stats();
+ // if (stats != null) {
+ // Map<String,Map<String,Long>> statted =
+ // WATCH_EMITTED.equals(_watch) ? stats.get_emitted() : stats.get_transferred();
+ // if ( statted != null) {
+ // Map<String, Long> e2 = statted.get(":all-time");
+ // if (e2 != null) {
+ // Long stream = e2.get(_stream);
+ // if (stream != null){
+ // streamFound = true;
+ // totalStatted += stream;
+ // }
+ // }
+ // }
+ // }
+ // }
+ // }
}
if (componentParallelism <= 0) {
@@ -242,8 +243,9 @@ public class Monitor {
long stattedDelta = totalStatted - state.getLastStatted();
state.setLastTime(now);
state.setLastStatted(totalStatted);
- double throughput = (stattedDelta == 0 || timeDelta == 0) ? 0.0 : ((double)stattedDelta/(double)timeDelta);
- System.out.println(_topology+"\t"+_component+"\t"+componentParallelism+"\t"+_stream+"\t"+timeDelta+"\t"+stattedDelta+"\t"+throughput);
+ double throughput = (stattedDelta == 0 || timeDelta == 0) ? 0.0 : ((double) stattedDelta / (double) timeDelta);
+ System.out.println(_topology + "\t" + _component + "\t" + componentParallelism + "\t" + _stream + "\t" + timeDelta + "\t" + stattedDelta + "\t"
+ + throughput);
}
public void set_interval(int _interval) {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/MutableInt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/MutableInt.java b/jstorm-core/src/main/java/backtype/storm/utils/MutableInt.java
index 326ade0..aca3a24 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/MutableInt.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/MutableInt.java
@@ -23,21 +23,21 @@ public class MutableInt {
public MutableInt(int val) {
this.val = val;
}
-
+
public void set(int val) {
this.val = val;
}
-
+
public int get() {
return val;
}
-
+
public int increment() {
return increment(1);
}
-
+
public int increment(int amt) {
- val+=amt;
+ val += amt;
return val;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/MutableLong.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/MutableLong.java b/jstorm-core/src/main/java/backtype/storm/utils/MutableLong.java
index a744c1c..2f4034e 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/MutableLong.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/MutableLong.java
@@ -23,21 +23,21 @@ public class MutableLong {
public MutableLong(long val) {
this.val = val;
}
-
+
public void set(long val) {
this.val = val;
}
-
+
public long get() {
return val;
}
-
+
public long increment() {
return increment(1);
}
-
+
public long increment(long amt) {
- val+=amt;
+ val += amt;
return val;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/MutableObject.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/MutableObject.java b/jstorm-core/src/main/java/backtype/storm/utils/MutableObject.java
index d5cb7db..d0f928c 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/MutableObject.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/MutableObject.java
@@ -19,19 +19,19 @@ package backtype.storm.utils;
public class MutableObject {
Object o = null;
-
+
public MutableObject() {
-
+
}
public MutableObject(Object o) {
this.o = o;
}
-
+
public void setObject(Object o) {
this.o = o;
}
-
+
public Object getObject() {
return o;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/NimbusClient.java b/jstorm-core/src/main/java/backtype/storm/utils/NimbusClient.java
index 5829b67..ac76439 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/NimbusClient.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/NimbusClient.java
@@ -31,24 +31,24 @@ import backtype.storm.security.auth.ThriftConnectionType;
public class NimbusClient extends ThriftClient {
private static final Logger LOG = LoggerFactory.getLogger(NimbusClient.class);
-
+
private Nimbus.Client _client;
private static String clientVersion = Utils.getVersion();
-
+
@SuppressWarnings("unchecked")
public static NimbusClient getConfiguredClient(Map conf) {
return getConfiguredClient(conf, null);
}
-
+
@SuppressWarnings("unchecked")
public static NimbusClient getConfiguredClient(Map conf, Integer timeout) {
return getConfiguredClientAs(conf, timeout, null);
}
-
+
public static NimbusClient getConfiguredClientAs(Map conf, String asUser) {
return getConfiguredClientAs(conf, null, asUser);
}
-
+
public static void checkVersion(NimbusClient client) {
String serverVersion;
try {
@@ -56,24 +56,24 @@ public class NimbusClient extends ThriftClient {
} catch (TException e) {
// TODO Auto-generated catch block
LOG.warn("Failed to get nimbus version ");
- return ;
+ return;
}
if (!clientVersion.equals(serverVersion)) {
LOG.warn("Your client version: " + clientVersion + " but nimbus version: " + serverVersion);
}
}
-
+
public static NimbusClient getConfiguredClientAs(Map conf, Integer timeout, String asUser) {
try {
- if(conf.containsKey(Config.STORM_DO_AS_USER)) {
- if(asUser != null && !asUser.isEmpty()) {
- LOG.warn("You have specified a doAsUser as param {} and a doAsParam as config, config will take precedence."
- , asUser, conf.get(Config.STORM_DO_AS_USER));
+ if (conf.containsKey(Config.STORM_DO_AS_USER)) {
+ if (asUser != null && !asUser.isEmpty()) {
+ LOG.warn("You have specified a doAsUser as param {} and a doAsParam as config, config will take precedence.", asUser,
+ conf.get(Config.STORM_DO_AS_USER));
}
asUser = (String) conf.get(Config.STORM_DO_AS_USER);
}
-
- NimbusClient client = new NimbusClient(conf, null, null, timeout, asUser);
+
+ NimbusClient client = new NimbusClient(conf, null, null, timeout, asUser);
checkVersion(client);
return client;
} catch (Exception ex) {
@@ -84,24 +84,24 @@ public class NimbusClient extends ThriftClient {
public NimbusClient(Map conf, String host, int port) throws TTransportException {
this(conf, host, port, null);
}
-
+
public NimbusClient(Map conf, String host, int port, Integer timeout) throws TTransportException {
super(conf, ThriftConnectionType.NIMBUS, host, port, timeout, null);
_client = new Nimbus.Client(_protocol);
}
-
+
public NimbusClient(Map conf, String host, Integer port, Integer timeout, String asUser) throws TTransportException {
super(conf, ThriftConnectionType.NIMBUS, host, port, timeout, asUser);
_client = new Nimbus.Client(_protocol);
}
-
+
public NimbusClient(Map conf, String host) throws TTransportException {
super(conf, ThriftConnectionType.NIMBUS, host, null, null, null);
_client = new Nimbus.Client(_protocol);
}
-
+
public Nimbus.Client getClient() {
return _client;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/RegisteredGlobalState.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/RegisteredGlobalState.java b/jstorm-core/src/main/java/backtype/storm/utils/RegisteredGlobalState.java
index 48053fc..fbaf03b 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/RegisteredGlobalState.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/RegisteredGlobalState.java
@@ -21,44 +21,42 @@ import java.util.HashMap;
import java.util.UUID;
/**
- * This class is used as part of testing Storm. It is used to keep track of "global metrics"
- * in an atomic way. For example, it is used for doing fine-grained detection of when a
- * local Storm cluster is idle by tracking the number of transferred tuples vs the number of processed
- * tuples.
+ * This class is used as part of testing Storm. It is used to keep track of "global metrics" in an atomic way. For example, it is used for doing fine-grained
+ * detection of when a local Storm cluster is idle by tracking the number of transferred tuples vs the number of processed tuples.
*/
public class RegisteredGlobalState {
private static HashMap<String, Object> _states = new HashMap<String, Object>();
private static final Object _lock = new Object();
-
+
public static Object globalLock() {
return _lock;
}
-
+
public static String registerState(Object init) {
- synchronized(_lock) {
+ synchronized (_lock) {
String id = UUID.randomUUID().toString();
_states.put(id, init);
return id;
}
}
-
+
public static void setState(String id, Object init) {
- synchronized(_lock) {
+ synchronized (_lock) {
_states.put(id, init);
}
}
-
+
public static Object getState(String id) {
- synchronized(_lock) {
+ synchronized (_lock) {
Object ret = _states.get(id);
- //System.out.println("State: " + ret.toString());
+ // System.out.println("State: " + ret.toString());
return ret;
- }
+ }
}
-
+
public static void clearState(String id) {
- synchronized(_lock) {
+ synchronized (_lock) {
_states.remove(id);
- }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/RotatingMap.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/RotatingMap.java b/jstorm-core/src/main/java/backtype/storm/utils/RotatingMap.java
index 2ed0e33..db62e5c 100644
--- a/jstorm-core/src/main/java/backtype/storm/utils/RotatingMap.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/RotatingMap.java
@@ -24,18 +24,17 @@ import java.util.Map;
import java.util.Map.Entry;
/**
- * Expires keys that have not been updated in the configured number of seconds.
- * The algorithm used will take between expirationSecs and
- * expirationSecs * (1 + 1 / (numBuckets-1)) to actually expire the message.
- *
+ * Expires keys that have not been updated in the configured number of seconds. The algorithm used will take between expirationSecs and expirationSecs * (1 + 1
+ * / (numBuckets-1)) to actually expire the message.
+ *
* get, put, remove, containsKey, and size take O(numBuckets) time to run.
- *
- * The advantage of this design is that the expiration thread only locks the object
- * for O(1) time, meaning the object is essentially always available for gets/puts.
+ *
+ * The advantage of this design is that the expiration thread only locks the object for O(1) time, meaning the object is essentially always available for
+ * gets/puts.
*/
@Deprecated
public class RotatingMap<K, V> {
- //this default ensures things expire at most 50% past the expiration time
+ // this default ensures things expire at most 50% past the expiration time
private static final int DEFAULT_NUM_BUCKETS = 3;
public static interface ExpiredCallback<K, V> {
@@ -45,13 +44,13 @@ public class RotatingMap<K, V> {
private LinkedList<HashMap<K, V>> _buckets;
private ExpiredCallback _callback;
-
+
public RotatingMap(int numBuckets, ExpiredCallback<K, V> callback) {
- if(numBuckets<2) {
+ if (numBuckets < 2) {
throw new IllegalArgumentException("numBuckets must be >= 2");
}
_buckets = new LinkedList<HashMap<K, V>>();
- for(int i=0; i<numBuckets; i++) {
+ for (int i = 0; i < numBuckets; i++) {
_buckets.add(new HashMap<K, V>());
}
@@ -64,13 +63,13 @@ public class RotatingMap<K, V> {
public RotatingMap(int numBuckets) {
this(numBuckets, null);
- }
-
+ }
+
public Map<K, V> rotate() {
Map<K, V> dead = _buckets.removeLast();
_buckets.addFirst(new HashMap<K, V>());
- if(_callback!=null) {
- for(Entry<K, V> entry: dead.entrySet()) {
+ if (_callback != null) {
+ for (Entry<K, V> entry : dead.entrySet()) {
_callback.expire(entry.getKey(), entry.getValue());
}
}
@@ -78,8 +77,8 @@ public class RotatingMap<K, V> {
}
public boolean containsKey(K key) {
- for(HashMap<K, V> bucket: _buckets) {
- if(bucket.containsKey(key)) {
+ for (HashMap<K, V> bucket : _buckets) {
+ if (bucket.containsKey(key)) {
return true;
}
}
@@ -87,8 +86,8 @@ public class RotatingMap<K, V> {
}
public V get(K key) {
- for(HashMap<K, V> bucket: _buckets) {
- if(bucket.containsKey(key)) {
+ for (HashMap<K, V> bucket : _buckets) {
+ if (bucket.containsKey(key)) {
return bucket.get(key);
}
}
@@ -99,16 +98,15 @@ public class RotatingMap<K, V> {
Iterator<HashMap<K, V>> it = _buckets.iterator();
HashMap<K, V> bucket = it.next();
bucket.put(key, value);
- while(it.hasNext()) {
+ while (it.hasNext()) {
bucket = it.next();
bucket.remove(key);
}
}
-
-
+
public Object remove(K key) {
- for(HashMap<K, V> bucket: _buckets) {
- if(bucket.containsKey(key)) {
+ for (HashMap<K, V> bucket : _buckets) {
+ if (bucket.containsKey(key)) {
return bucket.remove(key);
}
}
@@ -117,9 +115,9 @@ public class RotatingMap<K, V> {
public int size() {
int size = 0;
- for(HashMap<K, V> bucket: _buckets) {
- size+=bucket.size();
+ for (HashMap<K, V> bucket : _buckets) {
+ size += bucket.size();
}
return size;
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ServiceRegistry.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ServiceRegistry.java b/jstorm-core/src/main/java/backtype/storm/utils/ServiceRegistry.java
index 724bc3e..92dc2f7 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/ServiceRegistry.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/ServiceRegistry.java
@@ -24,24 +24,24 @@ import java.util.UUID;
public class ServiceRegistry {
private static HashMap<String, Object> _services = new HashMap<String, Object>();
private static final Object _lock = new Object();
-
+
public static String registerService(Object service) {
- synchronized(_lock) {
+ synchronized (_lock) {
String id = UUID.randomUUID().toString();
_services.put(id, service);
return id;
}
}
-
+
public static Object getService(String id) {
- synchronized(_lock) {
+ synchronized (_lock) {
return _services.get(id);
- }
+ }
}
-
+
public static void unregisterService(String id) {
- synchronized(_lock) {
+ synchronized (_lock) {
_services.remove(id);
- }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ShellProcess.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ShellProcess.java b/jstorm-core/src/main/java/backtype/storm/utils/ShellProcess.java
index 78f47d6..69af852 100644
--- a/jstorm-core/src/main/java/backtype/storm/utils/ShellProcess.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/ShellProcess.java
@@ -37,10 +37,10 @@ import org.slf4j.LoggerFactory;
public class ShellProcess implements Serializable {
public static Logger LOG = LoggerFactory.getLogger(ShellProcess.class);
public static Logger ShellLogger;
- private Process _subprocess;
- private InputStream processErrorStream;
- private String[] command;
- public ISerializer serializer;
+ private Process _subprocess;
+ private InputStream processErrorStream;
+ private String[] command;
+ public ISerializer serializer;
public Number pid;
public String componentName;
@@ -63,9 +63,7 @@ public class ShellProcess implements Serializable {
serializer.initialize(_subprocess.getOutputStream(), _subprocess.getInputStream());
this.pid = serializer.connect(conf, context);
} catch (IOException e) {
- throw new RuntimeException(
- "Error when launching multilang subprocess\n"
- + getErrorsString(), e);
+ throw new RuntimeException("Error when launching multilang subprocess\n" + getErrorsString(), e);
} catch (NoOutputException e) {
throw new RuntimeException(e + getErrorsString() + "\n");
}
@@ -73,18 +71,18 @@ public class ShellProcess implements Serializable {
}
private ISerializer getSerializer(Map conf) {
- //get factory class name
- String serializer_className = (String)conf.get(Config.TOPOLOGY_MULTILANG_SERIALIZER);
+ // get factory class name
+ String serializer_className = (String) conf.get(Config.TOPOLOGY_MULTILANG_SERIALIZER);
LOG.info("Storm multilang serializer: " + serializer_className);
ISerializer serializer = null;
try {
- //create a factory class
+ // create a factory class
Class klass = Class.forName(serializer_className);
- //obtain a serializer object
+ // obtain a serializer object
Object obj = klass.newInstance();
- serializer = (ISerializer)obj;
- } catch(Exception e) {
+ serializer = (ISerializer) obj;
+ } catch (Exception e) {
throw new RuntimeException("Failed to construct multilang serializer from serializer " + serializer_className, e);
}
return serializer;
@@ -152,7 +150,7 @@ public class ShellProcess implements Serializable {
}
/**
- *
+ *
* @return pid, if the process has been launched, null otherwise.
*/
public Number getPid() {
@@ -160,7 +158,7 @@ public class ShellProcess implements Serializable {
}
/**
- *
+ *
* @return the name of component.
*/
public String getComponentName() {
@@ -168,13 +166,13 @@ public class ShellProcess implements Serializable {
}
/**
- *
+ *
* @return exit code of the process if process is terminated, -1 if process is not started or terminated.
*/
public int getExitCode() {
try {
return this._subprocess != null ? this._subprocess.exitValue() : -1;
- } catch(IllegalThreadStateException e) {
+ } catch (IllegalThreadStateException e) {
return -1;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ShellUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ShellUtils.java b/jstorm-core/src/main/java/backtype/storm/utils/ShellUtils.java
index 1065ff9..261cbb7 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/ShellUtils.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/ShellUtils.java
@@ -31,19 +31,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-
abstract public class ShellUtils {
public static Logger LOG = LoggerFactory.getLogger(ShellUtils.class);
// OSType detection
public enum OSType {
- OS_TYPE_LINUX,
- OS_TYPE_WIN,
- OS_TYPE_SOLARIS,
- OS_TYPE_MAC,
- OS_TYPE_FREEBSD,
- OS_TYPE_OTHER
+ OS_TYPE_LINUX, OS_TYPE_WIN, OS_TYPE_SOLARIS, OS_TYPE_MAC, OS_TYPE_FREEBSD, OS_TYPE_OTHER
}
public static final OSType osType = getOSType();
@@ -69,29 +62,27 @@ abstract public class ShellUtils {
// Helper static vars for each platform
public static final boolean WINDOWS = (osType == OSType.OS_TYPE_WIN);
public static final boolean SOLARIS = (osType == OSType.OS_TYPE_SOLARIS);
- public static final boolean MAC = (osType == OSType.OS_TYPE_MAC);
+ public static final boolean MAC = (osType == OSType.OS_TYPE_MAC);
public static final boolean FREEBSD = (osType == OSType.OS_TYPE_FREEBSD);
- public static final boolean LINUX = (osType == OSType.OS_TYPE_LINUX);
- public static final boolean OTHER = (osType == OSType.OS_TYPE_OTHER);
-
+ public static final boolean LINUX = (osType == OSType.OS_TYPE_LINUX);
+ public static final boolean OTHER = (osType == OSType.OS_TYPE_OTHER);
/** Token separator regex used to parse Shell tool outputs */
- public static final String TOKEN_SEPARATOR_REGEX
- = WINDOWS ? "[|\n\r]" : "[ \t\n\r\f]";
+ public static final String TOKEN_SEPARATOR_REGEX = WINDOWS ? "[|\n\r]" : "[ \t\n\r\f]";
- private long interval; // refresh interval in msec
- private long lastTime; // last time the command was performed
+ private long interval; // refresh interval in msec
+ private long lastTime; // last time the command was performed
final private boolean redirectErrorStream; // merge stdout and stderr
private Map<String, String> environment; // env for the command execution
private File dir;
private Process process; // sub process used to execute the command
private int exitCode;
- /**Time after which the executing script would be timedout*/
+ /** Time after which the executing script would be timedout */
protected long timeOutInterval = 0L;
- /** If or not script timed out*/
+ /** If or not script timed out */
private AtomicBoolean timedOut;
- /**If or not script finished executing*/
+ /** If or not script finished executing */
private volatile AtomicBoolean completed;
public ShellUtils() {
@@ -103,23 +94,26 @@ abstract public class ShellUtils {
}
/**
- * @param interval the minimum duration to wait before re-executing the
- * command.
+ * @param interval the minimum duration to wait before re-executing the command.
*/
public ShellUtils(long interval, boolean redirectErrorStream) {
this.interval = interval;
- this.lastTime = (interval<0) ? 0 : -interval;
+ this.lastTime = (interval < 0) ? 0 : -interval;
this.redirectErrorStream = redirectErrorStream;
}
- /** set the environment for the command
+ /**
+ * set the environment for the command
+ *
* @param env Mapping of environment variables
*/
protected void setEnvironment(Map<String, String> env) {
this.environment = env;
}
- /** set the working directory
+ /**
+ * set the working directory
+ *
* @param dir The directory where the command would be executed
*/
protected void setWorkingDirectory(File dir) {
@@ -128,23 +122,18 @@ abstract public class ShellUtils {
/** a Unix command to get the current user's groups list */
public static String[] getGroupsCommand() {
- return (WINDOWS)? new String[]{"cmd", "/c", "groups"}
- : new String[]{"bash", "-c", "groups"};
+ return (WINDOWS) ? new String[] { "cmd", "/c", "groups" } : new String[] { "bash", "-c", "groups" };
}
/**
- * a Unix command to get a given user's groups list.
- * If the OS is not WINDOWS, the command will get the user's primary group
- * first and finally get the groups list which includes the primary group.
- * i.e. the user's primary group will be included twice.
+ * a Unix command to get a given user's groups list. If the OS is not WINDOWS, the command will get the user's primary group first and finally get the
+ * groups list which includes the primary group. i.e. the user's primary group will be included twice.
*/
public static String[] getGroupsForUserCommand(final String user) {
- //'groups username' command return is non-consistent across different unixes
- return new String [] {"bash", "-c", "id -gn " + user
- + "&& id -Gn " + user};
+ // 'groups username' command return is non-consistent across different unixes
+ return new String[] { "bash", "-c", "id -gn " + user + "&& id -Gn " + user };
}
-
/** check to see if a command needs to be executed and execute if needed */
protected void run() throws IOException {
if (lastTime + interval > System.currentTimeMillis())
@@ -174,51 +163,48 @@ abstract public class ShellUtils {
if (timeOutInterval > 0) {
timeOutTimer = new Timer("Shell command timeout");
timeoutTimerTask = new ShellTimeoutTimerTask(this);
- //One time scheduling.
+ // One time scheduling.
timeOutTimer.schedule(timeoutTimerTask, timeOutInterval);
}
- final BufferedReader errReader =
- new BufferedReader(new InputStreamReader(process
- .getErrorStream()));
- BufferedReader inReader =
- new BufferedReader(new InputStreamReader(process
- .getInputStream()));
+ final BufferedReader errReader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
+ BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
final StringBuffer errMsg = new StringBuffer();
// read error and input streams as this would free up the buffers
// free the error stream buffer
Thread errThread = new Thread() {
- @Override
- public void run() {
- try {
- String line = errReader.readLine();
- while((line != null) && !isInterrupted()) {
- errMsg.append(line);
- errMsg.append(System.getProperty("line.separator"));
- line = errReader.readLine();
- }
- } catch(IOException ioe) {
- LOG.warn("Error reading the error stream", ioe);
+ @Override
+ public void run() {
+ try {
+ String line = errReader.readLine();
+ while ((line != null) && !isInterrupted()) {
+ errMsg.append(line);
+ errMsg.append(System.getProperty("line.separator"));
+ line = errReader.readLine();
}
+ } catch (IOException ioe) {
+ LOG.warn("Error reading the error stream", ioe);
}
- };
+ }
+ };
try {
errThread.start();
- } catch (IllegalStateException ise) { }
+ } catch (IllegalStateException ise) {
+ }
try {
parseExecResult(inReader); // parse the output
// clear the input stream buffer
String line = inReader.readLine();
- while(line != null) {
+ while (line != null) {
line = inReader.readLine();
}
// wait for the process to finish and check the exit code
- exitCode = process.waitFor();
+ exitCode = process.waitFor();
// make sure that the error thread exits
joinThread(errThread);
completed.set(true);
- //the timeout thread handling
- //taken care in finally block
+ // the timeout thread handling
+ // taken care in finally block
if (exitCode != 0) {
throw new ExitCodeException(exitCode, errMsg.toString());
}
@@ -233,10 +219,10 @@ abstract public class ShellUtils {
// JDK 7 tries to automatically drain the input streams for us
// when the process exits, but since close is not synchronized,
// it creates a race if we close the stream first and the same
- // fd is recycled. the stream draining thread will attempt to
- // drain that fd!! it may block, OOM, or cause bizarre behavior
+ // fd is recycled. the stream draining thread will attempt to
+ // drain that fd!! it may block, OOM, or cause bizarre behavior
// see: https://bugs.openjdk.java.net/browse/JDK-8024521
- // issue is fixed in build 7u60
+ // issue is fixed in build 7u60
InputStream stdout = process.getInputStream();
synchronized (stdout) {
inReader.close();
@@ -278,10 +264,11 @@ abstract public class ShellUtils {
protected abstract String[] getExecString();
/** Parse the execution result */
- protected abstract void parseExecResult(BufferedReader lines)
- throws IOException;
+ protected abstract void parseExecResult(BufferedReader lines) throws IOException;
- /** get the current sub-process executing the given command
+ /**
+ * get the current sub-process executing the given command
+ *
* @return process executing the command
*/
public Process getProcess() {
@@ -306,18 +293,15 @@ abstract public class ShellUtils {
/**
* A simple shell command executor.
- *
- * <code>ShellCommandExecutor</code>should be used in cases where the output
- * of the command needs no explicit parsing and where the command, working
- * directory and the environment remains unchanged. The output of the command
- * is stored as-is and is expected to be small.
+ *
+ * <code>ShellCommandExecutor</code>should be used in cases where the output of the command needs no explicit parsing and where the command, working
+ * directory and the environment remains unchanged. The output of the command is stored as-is and is expected to be small.
*/
public static class ShellCommandExecutor extends ShellUtils {
private String[] command;
private StringBuffer output;
-
public ShellCommandExecutor(String[] execString) {
this(execString, null);
}
@@ -326,27 +310,22 @@ abstract public class ShellUtils {
this(execString, dir, null);
}
- public ShellCommandExecutor(String[] execString, File dir,
- Map<String, String> env) {
- this(execString, dir, env , 0L);
+ public ShellCommandExecutor(String[] execString, File dir, Map<String, String> env) {
+ this(execString, dir, env, 0L);
}
/**
* Create a new instance of the ShellCommandExecutor to execute a command.
- *
+ *
* @param execString The command to execute with arguments
- * @param dir If not-null, specifies the directory which should be set
- * as the current working directory for the command.
- * If null, the current working directory is not modified.
- * @param env If not-null, environment of the command will include the
- * key-value pairs specified in the map. If null, the current
- * environment is not modified.
- * @param timeout Specifies the time in milliseconds, after which the
- * command will be killed and the status marked as timedout.
- * If 0, the command will not be timed out.
+ * @param dir If not-null, specifies the directory which should be set as the current working directory for the command. If null, the current working
+ * directory is not modified.
+ * @param env If not-null, environment of the command will include the key-value pairs specified in the map. If null, the current environment is not
+ * modified.
+ * @param timeout Specifies the time in milliseconds, after which the command will be killed and the status marked as timedout. If 0, the command will
+ * not be timed out.
*/
- public ShellCommandExecutor(String[] execString, File dir,
- Map<String, String> env, long timeout) {
+ public ShellCommandExecutor(String[] execString, File dir, Map<String, String> env, long timeout) {
command = execString.clone();
if (dir != null) {
setWorkingDirectory(dir);
@@ -357,7 +336,6 @@ abstract public class ShellUtils {
timeOutInterval = timeout;
}
-
/** Execute the shell command. */
public void execute() throws IOException {
this.run();
@@ -373,21 +351,19 @@ abstract public class ShellUtils {
output = new StringBuffer();
char[] buf = new char[512];
int nRead;
- while ( (nRead = lines.read(buf, 0, buf.length)) > 0 ) {
+ while ((nRead = lines.read(buf, 0, buf.length)) > 0) {
output.append(buf, 0, nRead);
}
}
- /** Get the output of the shell command.*/
+ /** Get the output of the shell command. */
public String getOutput() {
return (output == null) ? "" : output.toString();
}
/**
- * Returns the commands of this instance.
- * Arguments with spaces in are presented with quotes round; other
- * arguments are presented raw
- *
+ * Returns the commands of this instance. Arguments with spaces in are presented with quotes round; other arguments are presented raw
+ *
* @return a string representation of the object.
*/
@Override
@@ -407,9 +383,8 @@ abstract public class ShellUtils {
}
/**
- * To check if the passed script to shell command executor timed out or
- * not.
- *
+ * To check if the passed script to shell command executor timed out or not.
+ *
* @return if the script timed out.
*/
public boolean isTimedOut() {
@@ -418,52 +393,45 @@ abstract public class ShellUtils {
/**
* Set if the command has timed out.
- *
+ *
*/
private void setTimedOut() {
this.timedOut.set(true);
}
-
/**
- * Static method to execute a shell command.
- * Covers most of the simple cases without requiring the user to implement
- * the <code>Shell</code> interface.
+ * Static method to execute a shell command. Covers most of the simple cases without requiring the user to implement the <code>Shell</code> interface.
+ *
* @param cmd shell command to execute.
* @return the output of the executed command.
*/
- public static String execCommand(String ... cmd) throws IOException {
+ public static String execCommand(String... cmd) throws IOException {
return execCommand(null, cmd, 0L);
}
/**
- * Static method to execute a shell command.
- * Covers most of the simple cases without requiring the user to implement
- * the <code>Shell</code> interface.
+ * Static method to execute a shell command. Covers most of the simple cases without requiring the user to implement the <code>Shell</code> interface.
+ *
* @param env the map of environment key=value
* @param cmd shell command to execute.
* @param timeout time in milliseconds after which script should be marked timeout
* @return the output of the executed command.o
*/
- public static String execCommand(Map<String, String> env, String[] cmd,
- long timeout) throws IOException {
- ShellCommandExecutor exec = new ShellCommandExecutor(cmd, null, env,
- timeout);
+ public static String execCommand(Map<String, String> env, String[] cmd, long timeout) throws IOException {
+ ShellCommandExecutor exec = new ShellCommandExecutor(cmd, null, env, timeout);
exec.execute();
return exec.getOutput();
}
/**
- * Static method to execute a shell command.
- * Covers most of the simple cases without requiring the user to implement
- * the <code>Shell</code> interface.
+ * Static method to execute a shell command. Covers most of the simple cases without requiring the user to implement the <code>Shell</code> interface.
+ *
* @param env the map of environment key=value
* @param cmd shell command to execute.
* @return the output of the executed command.
*/
- public static String execCommand(Map<String,String> env, String ... cmd)
- throws IOException {
+ public static String execCommand(Map<String, String> env, String... cmd) throws IOException {
return execCommand(env, cmd, 0L);
}
@@ -484,9 +452,9 @@ abstract public class ShellUtils {
try {
p.exitValue();
} catch (Exception e) {
- //Process has not terminated.
- //So check if it has completed
- //if not just destroy it.
+ // Process has not terminated.
+ // So check if it has completed
+ // if not just destroy it.
if (p != null && !shell.completed.get()) {
shell.setTimedOut();
p.destroy();
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java b/jstorm-core/src/main/java/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java
index 4aa5556..dd57832 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/StormBoundedExponentialBackoffRetry.java
@@ -31,12 +31,9 @@ public class StormBoundedExponentialBackoffRetry extends BoundedExponentialBacko
private final int linearBaseSleepMs;
/**
- * The class provides generic exponential-linear backoff retry strategy for
- * storm. It calculates threshold for exponentially increasing sleeptime
- * for retries. Beyond this threshold, the sleeptime increase is linear.
- * Also adds jitter for exponential/linear retry.
- * It guarantees currSleepTimeMs >= prevSleepTimeMs and
- * baseSleepTimeMs <= currSleepTimeMs <= maxSleepTimeMs
+ * The class provides generic exponential-linear backoff retry strategy for storm. It calculates threshold for exponentially increasing sleeptime for
+ * retries. Beyond this threshold, the sleeptime increase is linear. Also adds jitter for exponential/linear retry. It guarantees currSleepTimeMs >=
+ * prevSleepTimeMs and baseSleepTimeMs <= currSleepTimeMs <= maxSleepTimeMs
*/
public StormBoundedExponentialBackoffRetry(int baseSleepTimeMs, int maxSleepTimeMs, int maxRetries) {
@@ -44,17 +41,15 @@ public class StormBoundedExponentialBackoffRetry extends BoundedExponentialBacko
expRetriesThreshold = 1;
while ((1 << (expRetriesThreshold + 1)) < ((maxSleepTimeMs - baseSleepTimeMs) / 2))
expRetriesThreshold++;
- LOG.info("The baseSleepTimeMs [" + baseSleepTimeMs + "] the maxSleepTimeMs [" + maxSleepTimeMs + "] " +
- "the maxRetries [" + maxRetries + "]");
+ LOG.info("The baseSleepTimeMs [" + baseSleepTimeMs + "] the maxSleepTimeMs [" + maxSleepTimeMs + "] " + "the maxRetries [" + maxRetries + "]");
if (baseSleepTimeMs > maxSleepTimeMs) {
- LOG.warn("Misconfiguration: the baseSleepTimeMs [" + baseSleepTimeMs + "] can't be greater than " +
- "the maxSleepTimeMs [" + maxSleepTimeMs + "].");
+ LOG.warn("Misconfiguration: the baseSleepTimeMs [" + baseSleepTimeMs + "] can't be greater than " + "the maxSleepTimeMs [" + maxSleepTimeMs + "].");
}
- if( maxRetries > 0 && maxRetries > expRetriesThreshold ) {
+ if (maxRetries > 0 && maxRetries > expRetriesThreshold) {
this.stepSize = Math.max(1, (maxSleepTimeMs - (1 << expRetriesThreshold)) / (maxRetries - expRetriesThreshold));
} else {
this.stepSize = 1;
- }
+ }
this.linearBaseSleepMs = super.getBaseSleepTimeMs() + (1 << expRetriesThreshold);
}
@@ -67,8 +62,7 @@ public class StormBoundedExponentialBackoffRetry extends BoundedExponentialBacko
return sleepTimeMs;
} else {
int stepJitter = random.nextInt(stepSize);
- return Math.min(super.getMaxSleepTimeMs(), (linearBaseSleepMs +
- (stepSize * (retryCount - expRetriesThreshold)) + stepJitter));
+ return Math.min(super.getMaxSleepTimeMs(), (linearBaseSleepMs + (stepSize * (retryCount - expRetriesThreshold)) + stepJitter));
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/TestUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/TestUtils.java b/jstorm-core/src/main/java/backtype/storm/utils/TestUtils.java
index 276559c..f905ae4 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/TestUtils.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/TestUtils.java
@@ -25,9 +25,7 @@ import java.util.Map;
public class TestUtils extends Utils {
- public static void testSetupBuilder(CuratorFrameworkFactory.Builder
- builder, String zkStr, Map conf, ZookeeperAuthInfo auth)
- {
+ public static void testSetupBuilder(CuratorFrameworkFactory.Builder builder, String zkStr, Map conf, ZookeeperAuthInfo auth) {
setupBuilder(builder, zkStr, conf, auth);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ThreadResourceManager.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ThreadResourceManager.java b/jstorm-core/src/main/java/backtype/storm/utils/ThreadResourceManager.java
index e3ab03f..c43ff06 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/ThreadResourceManager.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/ThreadResourceManager.java
@@ -23,14 +23,14 @@ public class ThreadResourceManager<T> {
public static interface ResourceFactory<X> {
X makeResource();
}
-
+
ResourceFactory<T> _factory;
ConcurrentLinkedQueue<T> _resources = new ConcurrentLinkedQueue<T>();
-
+
public ThreadResourceManager(ResourceFactory<T> factory) {
_factory = factory;
}
-
+
public T acquire() {
T ret = _resources.poll();
if (ret == null) {
@@ -38,7 +38,7 @@ public class ThreadResourceManager<T> {
}
return ret;
}
-
+
public void release(T resource) {
_resources.add(resource);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ThriftTopologyUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ThriftTopologyUtils.java b/jstorm-core/src/main/java/backtype/storm/utils/ThriftTopologyUtils.java
index 47a48c7..c872721 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/ThriftTopologyUtils.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/ThriftTopologyUtils.java
@@ -17,17 +17,13 @@
*/
package backtype.storm.utils;
+import backtype.storm.generated.*;
+
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import backtype.storm.generated.Bolt;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.SpoutSpec;
-import backtype.storm.generated.StateSpoutSpec;
-import backtype.storm.generated.StormTopology;
-
public class ThriftTopologyUtils {
public static Set<String> getComponentIds(StormTopology topology) {
Set<String> ret = new HashSet<String>();
@@ -37,7 +33,7 @@ public class ThriftTopologyUtils {
}
return ret;
}
-
+
public static Map<String, Object> getComponents(StormTopology topology) {
Map<String, Object> ret = new HashMap<String, Object>();
for (StormTopology._Fields f : StormTopology.metaDataMap.keySet()) {
@@ -46,7 +42,7 @@ public class ThriftTopologyUtils {
}
return ret;
}
-
+
public static ComponentCommon getComponentCommon(StormTopology topology, String componentId) {
for (StormTopology._Fields f : StormTopology.metaDataMap.keySet()) {
Map<String, Object> componentMap = (Map<String, Object>) topology.getFieldValue(f);