You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/02/14 14:07:37 UTC

[2/9] storm git commit: STORM-2306 - Messaging subsystem redesign. New Backpressure model.

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-client/src/jvm/org/apache/storm/utils/DisruptorQueue.java
deleted file mode 100644
index 3779505..0000000
--- a/storm-client/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ /dev/null
@@ -1,619 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.utils;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.lmax.disruptor.AlertException;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.InsufficientCapacityException;
-import com.lmax.disruptor.LiteBlockingWaitStrategy;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.Sequence;
-import com.lmax.disruptor.SequenceBarrier;
-import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
-import com.lmax.disruptor.TimeoutException;
-import com.lmax.disruptor.WaitStrategy;
-import com.lmax.disruptor.dsl.ProducerType;
-
-import org.apache.storm.Config;
-import org.apache.storm.metric.api.IStatefulObject;
-import org.apache.storm.metric.internal.RateTracker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * A single consumer queue that uses the LMAX Disruptor. They key to the performance is
- * the ability to catch up to the producer by processing tuples in batches.
- */
-public class DisruptorQueue implements IStatefulObject {
-    private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueue.class);    
-    private static final Object INTERRUPT = new Object();
-    private static final String PREFIX = "disruptor-";
-    private static final FlusherPool FLUSHER = new FlusherPool();
-    
-    private static int getNumFlusherPoolThreads() {
-        int numThreads = 100;
-        try {
-        	Map<String, Object> conf = Utils.readStormConfig();
-        	numThreads = ObjectReader.getInt(conf.get(Config.STORM_WORKER_DISRUPTOR_FLUSHER_MAX_POOL_SIZE), numThreads);
-        } catch (Exception e) {
-        	LOG.warn("Error while trying to read system config", e);
-        }
-        try {
-            String threads = System.getProperty("num_flusher_pool_threads", String.valueOf(numThreads));
-            numThreads = Integer.parseInt(threads);
-        } catch (Exception e) {
-            LOG.warn("Error while parsing number of flusher pool threads", e);
-        }
-        LOG.debug("Reading num_flusher_pool_threads Flusher pool threads: {}", numThreads);
-        return numThreads;
-    }
-
-    private static class FlusherPool { 
-    	private static final String THREAD_PREFIX = "disruptor-flush";
-        private Timer _timer = new Timer(THREAD_PREFIX + "-trigger", true);
-        private ThreadPoolExecutor _exec;
-        private HashMap<Long, ArrayList<Flusher>> _pendingFlush = new HashMap<>();
-        private HashMap<Long, TimerTask> _tt = new HashMap<>();
-
-        public FlusherPool() {
-            _exec = new ThreadPoolExecutor(1, getNumFlusherPoolThreads(), 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024), new ThreadPoolExecutor.DiscardPolicy());
-            ThreadFactory threadFactory = new ThreadFactoryBuilder()
-                    .setDaemon(true)
-                    .setNameFormat(THREAD_PREFIX + "-task-pool")
-                    .build();
-            _exec.setThreadFactory(threadFactory);
-        }
-
-        public synchronized void start(Flusher flusher, final long flushInterval) {
-            ArrayList<Flusher> pending = _pendingFlush.get(flushInterval);
-            if (pending == null) {
-                pending = new ArrayList<>();
-                TimerTask t = new TimerTask() {
-                    @Override
-                    public void run() {
-                        invokeAll(flushInterval);
-                    }
-                };
-                _pendingFlush.put(flushInterval, pending);
-                _timer.schedule(t, flushInterval, flushInterval);
-                _tt.put(flushInterval, t);
-            }
-            pending.add(flusher);
-        }
-
-        private synchronized void invokeAll(long flushInterval) {
-            ArrayList<Flusher> tasks = _pendingFlush.get(flushInterval);
-            if (tasks != null) {
-                for (Flusher f: tasks) {
-                    _exec.submit(f);
-                }
-            }
-        }
-
-        public synchronized void stop(Flusher flusher, long flushInterval) {
-            ArrayList<Flusher> pending = _pendingFlush.get(flushInterval);
-            if (pending != null) {
-                pending.remove(flusher);
-                if (pending.size() == 0) {
-                    _pendingFlush.remove(flushInterval);
-                    _tt.remove(flushInterval).cancel();
-                }
-            }
-        }
-    }
-
-    private static class ObjectEventFactory implements EventFactory<AtomicReference<Object>> {
-        @Override
-        public AtomicReference<Object> newInstance() {
-            return new AtomicReference<Object>();
-        }
-    }
-
-    private interface ThreadLocalInserter {
-        public void add(Object obj);
-        public void forceBatch();
-        public void flush(boolean block);
-    }
-
-    private class ThreadLocalJustInserter implements ThreadLocalInserter {
-        private final ReentrantLock _flushLock;
-        private final ConcurrentLinkedQueue<Object> _overflow;
-
-        public ThreadLocalJustInserter() {
-            _flushLock = new ReentrantLock();
-            _overflow = new ConcurrentLinkedQueue<>();
-        }
-
-        //called by the main thread and should not block for an undefined period of time
-        public synchronized void add(Object obj) {
-            boolean inserted = false;
-            if (_overflow.isEmpty()) {
-                try {
-                    publishDirectSingle(obj, false);
-                    inserted = true;
-                } catch (InsufficientCapacityException e) {
-                    //Ignored
-                }
-            }
-
-            if (!inserted) {
-                _overflowCount.incrementAndGet();
-                _overflow.add(obj);
-            }
-
-            if (_enableBackpressure && _cb != null && (_metrics.population() + _overflowCount.get()) >= _highWaterMark) {
-                try {
-                    if (!_throttleOn) {
-                        _throttleOn = true;
-                        _cb.highWaterMark();
-                    }
-                } catch (Exception e) {
-                    throw new RuntimeException("Exception during calling highWaterMark callback!", e);
-                }
-            }
-        }
-
-        //May be called by a background thread
-        public void forceBatch() {
-            //NOOP
-        }
-
-        //May be called by a background thread
-        public void flush(boolean block) {
-            if (block) {
-                _flushLock.lock();
-            } else if (!_flushLock.tryLock()) {
-               //Someone else if flushing so don't do anything
-               return;
-            }
-            try {
-                while (!_overflow.isEmpty()) {
-                    publishDirectSingle(_overflow.peek(), block);
-                    _overflowCount.addAndGet(-1);
-                    _overflow.poll();
-                }
-            } catch (InsufficientCapacityException e) {
-                //Ignored we should not block
-            } finally {
-                _flushLock.unlock();
-            }
-        }
-    }
-
-    private class ThreadLocalBatcher implements ThreadLocalInserter {
-        private final ReentrantLock _flushLock;
-        private final ConcurrentLinkedQueue<ArrayList<Object>> _overflow;
-        private ArrayList<Object> _currentBatch;
-
-        public ThreadLocalBatcher() {
-            _flushLock = new ReentrantLock();
-            _overflow = new ConcurrentLinkedQueue<ArrayList<Object>>();
-            _currentBatch = new ArrayList<Object>(_inputBatchSize);
-        }
-
-        //called by the main thread and should not block for an undefined period of time
-        public synchronized void add(Object obj) {
-            _currentBatch.add(obj);
-            _overflowCount.incrementAndGet();
-            if (_enableBackpressure && _cb != null && (_metrics.population() + _overflowCount.get()) >= _highWaterMark) {
-                try {
-                    if (!_throttleOn) {
-                        _throttleOn = true;
-                        _cb.highWaterMark();
-                    }
-                } catch (Exception e) {
-                    throw new RuntimeException("Exception during calling highWaterMark callback!", e);
-                }
-            }
-            if (_currentBatch.size() >= _inputBatchSize) {
-                boolean flushed = false;
-                if (_overflow.isEmpty()) {
-                    try {
-                        publishDirect(_currentBatch, false);
-                        _overflowCount.addAndGet(0 - _currentBatch.size());
-                        _currentBatch.clear();
-                        flushed = true;
-                    } catch (InsufficientCapacityException e) {
-                        //Ignored we will flush later
-                    }
-                }
-
-                if (!flushed) {        
-                    _overflow.add(_currentBatch);
-                    _currentBatch = new ArrayList<Object>(_inputBatchSize);
-                }
-            }
-        }
-
-        //May be called by a background thread
-        public synchronized void forceBatch() {
-            if (!_currentBatch.isEmpty()) {
-                _overflow.add(_currentBatch);
-                _currentBatch = new ArrayList<Object>(_inputBatchSize);
-            }
-        }
-
-        //May be called by a background thread
-        public void flush(boolean block) {
-            if (block) {
-                _flushLock.lock();
-            } else if (!_flushLock.tryLock()) {
-               //Someone else if flushing so don't do anything
-               return;
-            }
-            try {
-                while (!_overflow.isEmpty()) {
-                    publishDirect(_overflow.peek(), block);
-                    _overflowCount.addAndGet(0 - _overflow.poll().size());
-                }
-            } catch (InsufficientCapacityException e) {
-                //Ignored we should not block
-            } finally {
-                _flushLock.unlock();
-            }
-        }
-    }
-
-    private class Flusher implements Runnable {
-        private AtomicBoolean _isFlushing = new AtomicBoolean(false);
-        private final long _flushInterval;
-
-        public Flusher(long flushInterval, String name) {
-            _flushInterval = flushInterval;
-        }
-
-        public void run() {
-            if (_isFlushing.compareAndSet(false, true)) {
-                for (ThreadLocalInserter batcher: _batchers.values()) {
-                    batcher.forceBatch();
-                    batcher.flush(true);
-                }
-                _isFlushing.set(false);
-            }
-        }
-
-        public void start() {
-            FLUSHER.start(this, _flushInterval);
-        }
-
-        public void close() {
-            FLUSHER.stop(this, _flushInterval);
-        }
-    }
-
-    /**
-     * This inner class provides methods to access the metrics of the disruptor queue.
-     */
-    public class QueueMetrics {
-        private final RateTracker _rateTracker = new RateTracker(10000, 10);
-
-        public long writePos() {
-            return _buffer.getCursor();
-        }
-
-        public long readPos() {
-            return _consumer.get();
-        }
-
-        public long overflow() {
-            return _overflowCount.get();
-        }
-
-        public long population() {
-            return writePos() - readPos();
-        }
-
-        public long capacity() {
-            return _buffer.getBufferSize();
-        }
-
-        public float pctFull() {
-            return (1.0F * population() / capacity());
-        }
-
-        public Object getState() {
-            Map state = new HashMap<String, Object>();
-
-            // get readPos then writePos so it's never an under-estimate
-            long rp = readPos();
-            long wp = writePos();
-
-            final long tuplePop = tuplePopulation.get();
-
-            final double arrivalRateInSecs = _rateTracker.reportRate();
-
-            //Assume the queue is stable, in which the arrival rate is equal to the consumption rate.
-            // If this assumption does not hold, the calculation of sojourn time should also consider
-            // departure rate according to Queuing Theory.
-            final double sojournTime = tuplePop / Math.max(arrivalRateInSecs, 0.00001) * 1000.0;
-
-            state.put("capacity", capacity());
-            state.put("population", wp - rp);
-            state.put("tuple_population", tuplePop);
-            state.put("write_pos", wp);
-            state.put("read_pos", rp);
-            state.put("arrival_rate_secs", arrivalRateInSecs);
-            state.put("sojourn_time_ms", sojournTime); //element sojourn time in milliseconds
-            state.put("overflow", _overflowCount.get());
-
-            return state;
-        }
-
-        public void notifyArrivals(long counts) {
-            _rateTracker.notify(counts);
-            tuplePopulation.getAndAdd(counts);
-        }
-
-        public void notifyDepartures(long counts) {
-            tuplePopulation.getAndAdd(-counts);
-        }
-
-        public void close() {
-            _rateTracker.close();
-        }
-    }
-
-    private final RingBuffer<AtomicReference<Object>> _buffer;
-    private final Sequence _consumer;
-    private final SequenceBarrier _barrier;
-    private final int _inputBatchSize;
-    private final ConcurrentHashMap<Long, ThreadLocalInserter> _batchers = new ConcurrentHashMap<Long, ThreadLocalInserter>();
-    private final Flusher _flusher;
-    private final QueueMetrics _metrics;
-
-    private String _queueName = "";
-    private DisruptorBackpressureCallback _cb = null;
-    private int _highWaterMark = 0;
-    private int _lowWaterMark = 0;
-    private boolean _enableBackpressure = false;
-    private final AtomicLong _overflowCount = new AtomicLong(0);
-    private final AtomicLong tuplePopulation = new AtomicLong(0);
-    private volatile boolean _throttleOn = false;
-
-    public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval) {
-        this._queueName = PREFIX + queueName;
-        WaitStrategy wait;
-        if (readTimeout <= 0) {
-            wait = new LiteBlockingWaitStrategy();
-        } else {
-            wait = new TimeoutBlockingWaitStrategy(readTimeout, TimeUnit.MILLISECONDS);
-        }
-
-        _buffer = RingBuffer.create(type, new ObjectEventFactory(), size, wait);
-        _consumer = new Sequence();
-        _barrier = _buffer.newBarrier();
-        _buffer.addGatingSequences(_consumer);
-        _metrics = new QueueMetrics();
-        //The batch size can be no larger than half the full queue size.
-        //This is mostly to avoid contention issues.
-        _inputBatchSize = Math.max(1, Math.min(inputBatchSize, size/2));
-
-        _flusher = new Flusher(Math.max(flushInterval, 1), _queueName);
-        _flusher.start();
-    }
-
-    public DisruptorQueue(String queueName,  int size, long readTimeout, int inputBatchSize, long flushInterval) {
-        this(queueName, ProducerType.MULTI, size, readTimeout, inputBatchSize, flushInterval);
-    }
-
-    public String getName() {
-        return _queueName;
-    }
-
-    public boolean isFull() {
-        return (_metrics.population() + _overflowCount.get()) >= _metrics.capacity();
-    }
-
-    public void haltWithInterrupt() {
-        try {
-            publishDirect(new ArrayList<Object>(Arrays.asList(INTERRUPT)), true);
-            _flusher.close();
-            _metrics.close();
-        } catch (InsufficientCapacityException e) {
-            //This should be impossible
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void consumeBatch(EventHandler<Object> handler) {
-        if (_metrics.population() > 0) {
-            consumeBatchWhenAvailable(handler);
-        }
-    }
-
-    public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
-        try {
-            final long nextSequence = _consumer.get() + 1;
-            long availableSequence = _barrier.waitFor(nextSequence);
-
-            if (availableSequence >= nextSequence) {
-                consumeBatchToCursor(availableSequence, handler);
-            }
-        } catch (TimeoutException te) {
-            //Ignored
-        } catch (AlertException e) {
-            throw new RuntimeException(e);
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
-        for (long curr = _consumer.get() + 1; curr <= cursor; curr++) {
-            try {
-                AtomicReference<Object> mo = _buffer.get(curr);
-                Object o = mo.getAndSet(null);
-                if (o == INTERRUPT) {
-                    throw new InterruptedException("Disruptor processing interrupted");
-                } else if (o == null) {
-                    LOG.error("NULL found in {}:{}", this.getName(), cursor);
-                } else {
-                    _metrics.notifyDepartures(getTupleCount(o));
-                    handler.onEvent(o, curr, curr == cursor);
-                    if (_enableBackpressure && _cb != null && (_metrics.writePos() - curr + _overflowCount.get()) <= _lowWaterMark) {
-                        try {
-                            if (_throttleOn) {
-                                _throttleOn = false;
-                                _cb.lowWaterMark();
-                            }
-                        } catch (Exception e) {
-                            throw new RuntimeException("Exception during calling lowWaterMark callback!");
-                        }
-                    }
-                }
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-        _consumer.set(cursor);
-    }
-
-    public void registerBackpressureCallback(DisruptorBackpressureCallback cb) {
-        this._cb = cb;
-    }
-
-    private static Long getId() {
-        return Thread.currentThread().getId();
-    }
-
-    private long getTupleCount(Object obj) {
-        //a published object could be an instance of either AddressedTuple, ArrayList<AddressedTuple>, or HashMap<Integer, ArrayList<TaskMessage>>.
-        long tupleCount;
-        if (obj instanceof ArrayList) {
-            tupleCount = ((ArrayList) obj).size();
-        } else if (obj instanceof HashMap) {
-            tupleCount = 0;
-            for (Object value:((HashMap) obj).values()) {
-                tupleCount += ((ArrayList) value).size();
-            }
-        } else {
-            tupleCount = 1;
-        }
-        return tupleCount;
-    }
-
-    private void publishDirectSingle(Object obj, boolean block) throws InsufficientCapacityException {
-        long at;
-        long numberOfTuples;
-        if (block) {
-            at = _buffer.next();
-        } else {
-            at = _buffer.tryNext();
-        }
-        AtomicReference<Object> m = _buffer.get(at);
-        m.set(obj);
-        _buffer.publish(at);
-        numberOfTuples = getTupleCount(obj);
-        _metrics.notifyArrivals(numberOfTuples);
-    }
-
-    private void publishDirect(ArrayList<Object> objs, boolean block) throws InsufficientCapacityException {
-        int size = objs.size();
-        if (size > 0) {
-            long end;
-            if (block) {
-                end = _buffer.next(size);
-            } else {
-                end = _buffer.tryNext(size);
-            }
-            long begin = end - (size - 1);
-            long at = begin;
-            long numberOfTuples = 0;
-            for (Object obj: objs) {
-                AtomicReference<Object> m = _buffer.get(at);
-                m.set(obj);
-                at++;
-                numberOfTuples += getTupleCount(obj);
-            }
-            _metrics.notifyArrivals(numberOfTuples);
-            _buffer.publish(begin, end);
-        }
-    }
-
-    public void publish(Object obj) {
-        Long id = getId();
-        ThreadLocalInserter batcher = _batchers.get(id);
-        if (batcher == null) {
-            //This thread is the only one ever creating this, so this is safe
-            if (_inputBatchSize > 1) {
-                batcher = new ThreadLocalBatcher();
-            } else {
-                batcher = new ThreadLocalJustInserter();
-            }
-            _batchers.put(id, batcher);
-        }
-        batcher.add(obj);
-        batcher.flush(false);
-    }
-
-    @Override
-    public Object getState() {
-        return _metrics.getState();
-    }
-
-    public DisruptorQueue setHighWaterMark(double highWaterMark) {
-        this._highWaterMark = (int)(_metrics.capacity() * highWaterMark);
-        return this;
-    }
-
-    public DisruptorQueue setLowWaterMark(double lowWaterMark) {
-        this._lowWaterMark = (int)(_metrics.capacity() * lowWaterMark);
-        return this;
-    }
-
-    public int getHighWaterMark() {
-        return this._highWaterMark;
-    }
-
-    public int getLowWaterMark() {
-        return this._lowWaterMark;
-    }
-
-    public DisruptorQueue setEnableBackpressure(boolean enableBackpressure) {
-        this._enableBackpressure = enableBackpressure;
-        return this;
-    }
-
-    //This method enables the metrics to be accessed from outside of the DisruptorQueue class
-    public QueueMetrics getMetrics() {
-        return _metrics;
-    }
-
-	public boolean getThrottleOn() {
-	    return _throttleOn;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
new file mode 100644
index 0000000..26b0ac4
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.utils;
+
+import org.apache.storm.policy.IWaitStrategy;
+import org.apache.storm.metric.api.IStatefulObject;
+import org.apache.storm.metric.internal.RateTracker;
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.MpscArrayQueue;
+import org.jctools.queues.MpscUnboundedArrayQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class JCQueue implements IStatefulObject {
+    private static final Logger LOG = LoggerFactory.getLogger(JCQueue.class);
+
+    public static final Object INTERRUPT = new Object();
+
+    private final ExitCondition continueRunning = () -> true;
+
+    private interface Inserter {
+        // blocking call that can be interrupted using Thread.interrupt()
+        void publish(Object obj) throws InterruptedException;
+        boolean tryPublish(Object obj);
+
+        void flush() throws InterruptedException;
+        boolean tryFlush();
+    }
+
+    /* Thread safe. Same instance can be used across multiple threads */
+    private static class DirectInserter implements Inserter {
+        private JCQueue q;
+
+        public DirectInserter(JCQueue q) {
+            this.q = q;
+        }
+
+        /** Blocking call, that can be interrupted via Thread.interrupt */
+        @Override
+        public void publish(Object obj) throws InterruptedException {
+            boolean inserted = q.tryPublishInternal(obj);
+            int idleCount = 0;
+            while (!inserted) {
+                q.metrics.notifyInsertFailure();
+                if (idleCount==0) { // check avoids multiple log msgs when in a idle loop
+                    LOG.debug("Experiencing Back Pressure on recvQueue: '{}'. Entering BackPressure Wait", q.getName());
+                }
+
+                idleCount = q.backPressureWaitStrategy.idle(idleCount);
+                if (Thread.interrupted()) {
+                    throw new InterruptedException();
+                }
+                inserted = q.tryPublishInternal(obj);
+            }
+
+        }
+
+        /** Non-Blocking call. return value indicates success/failure */
+        @Override
+        public boolean tryPublish(Object obj) {
+            boolean inserted = q.tryPublishInternal(obj);
+            if (!inserted) {
+                q.metrics.notifyInsertFailure();
+                return false;
+            }
+            return true;
+        }
+
+        @Override
+        public void flush() throws InterruptedException {
+            return;
+        }
+
+        @Override
+        public boolean tryFlush() {
+            return true;
+        }
+    } // class DirectInserter
+
+    /* Not thread safe. Have one instance per producer thread or synchronize externally */
+    private static class BatchInserter implements Inserter {
+        private JCQueue q;
+        private final int batchSz;
+        private ArrayList<Object> currentBatch;
+
+        public BatchInserter(JCQueue q, int batchSz) {
+            this.q = q;
+            this.batchSz = batchSz;
+            this.currentBatch = new ArrayList<>(batchSz + 1);
+        }
+
+        /** Blocking call - retires till element is successfully added */
+        @Override
+        public void publish(Object obj) throws InterruptedException {
+            currentBatch.add(obj);
+            if (currentBatch.size() >= batchSz) {
+                flush();
+            }
+        }
+
+        /** Non-Blocking call. return value indicates success/failure */
+        @Override
+        public boolean tryPublish(Object obj) {
+            if (currentBatch.size() >= batchSz) {
+                if (!tryFlush()) {
+                    return false;
+                }
+            }
+            currentBatch.add(obj);
+            return true;
+        }
+
+        /** Blocking call - Does not return until at least 1 element is drained or Thread.interrupt() is received.
+         *    Uses backpressure wait strategy. */
+        @Override
+        public void flush() throws InterruptedException {
+            if (currentBatch.isEmpty()) {
+                return;
+            }
+            int publishCount = q.tryPublishInternal(currentBatch);
+            int retryCount = 0;
+            while (publishCount == 0) { // retry till at least 1 element is drained
+                q.metrics.notifyInsertFailure();
+                if (retryCount==0) { // check avoids multiple log msgs when in a idle loop
+                    LOG.debug("Experiencing Back Pressure when flushing batch to Q: {}. Entering BackPressure Wait.", q.getName());
+                }
+                retryCount = q.backPressureWaitStrategy.idle(retryCount);
+                if (Thread.interrupted()) {
+                    throw new InterruptedException();
+                }
+                publishCount = q.tryPublishInternal(currentBatch);
+            }
+            currentBatch.subList(0, publishCount).clear();
+        }
+
+        /** Non blocking call. tries to flush as many as possible. Returns true if at least one from non-empty currentBatch was flushed
+         *      or if currentBatch is empty. Returns false otherwise */
+        @Override
+        public boolean tryFlush() {
+            if (currentBatch.isEmpty()) {
+                return true;
+            }
+            int publishCount = q.tryPublishInternal(currentBatch);
+            if (publishCount == 0) {
+                q.metrics.notifyInsertFailure();
+                return false;
+            } else {
+                currentBatch.subList(0, publishCount).clear();
+                return true;
+            }
+        }
+    } // class BatchInserter
+
+    /**
+     * This inner class provides methods to access the metrics of the JCQueue.
+     */
+    public class QueueMetrics {
+        private final RateTracker arrivalsTracker = new RateTracker(10000, 10);
+        private final RateTracker insertFailuresTracker = new RateTracker(10000, 10);
+        private final AtomicLong droppedMessages = new AtomicLong(0);
+
+        public long population() {
+            return recvQueue.size();
+        }
+
+        public long capacity() {
+            return recvQueue.capacity();
+        }
+
+        public Object getState() {
+            HashMap state = new HashMap<String, Object>();
+
+            final double arrivalRateInSecs = arrivalsTracker.reportRate();
+
+            long tuplePop = population();
+
+            // Assume the recvQueue is stable, in which the arrival rate is equal to the consumption rate.
+            // If this assumption does not hold, the calculation of sojourn time should also consider
+            // departure rate according to Queuing Theory.
+            final double sojournTime = tuplePop / Math.max(arrivalRateInSecs, 0.00001) * 1000.0;
+
+            long cap = capacity();
+            float pctFull = (1.0F * tuplePop / cap);
+
+            state.put("capacity", cap);
+            state.put("pct_full", pctFull);
+            state.put("population", tuplePop);
+
+            state.put("arrival_rate_secs", arrivalRateInSecs);
+            state.put("sojourn_time_ms", sojournTime); //element sojourn time in milliseconds
+            state.put("insert_failures", insertFailuresTracker.reportRate());
+            state.put("dropped_messages", droppedMessages);
+            state.put("overflow", overflowQ.size());
+            return state;
+        }
+
+        public void notifyArrivals(long counts) {
+            arrivalsTracker.notify(counts);
+        }
+
+        public void notifyInsertFailure() {
+            insertFailuresTracker.notify(1);
+        }
+
+        public void notifyDroppedMsg() {
+            droppedMessages.incrementAndGet();
+        }
+
+        public void close() {
+            arrivalsTracker.close();
+            insertFailuresTracker.close();
+        }
+
+    }
+
+    private final MpscArrayQueue<Object> recvQueue;
+    private final MpscUnboundedArrayQueue<Object> overflowQ; // only holds msgs from other workers (via WorkerTransfer), when recvQueue is full
+    private final int overflowLimit; // ensures... overflowCount <= overflowLimit. if set to 0, disables overflow.
+
+
+    private final int producerBatchSz;
+    private final DirectInserter directInserter = new DirectInserter(this);
+
+    private final ThreadLocal<BatchInserter> thdLocalBatcher = new ThreadLocal<BatchInserter>(); // ensure 1 instance per producer thd.
+
+    private final JCQueue.QueueMetrics metrics;
+
+    private String queueName;
+    private final IWaitStrategy backPressureWaitStrategy;
+
+    public JCQueue(String queueName, int size, int overflowLimit, int producerBatchSz, IWaitStrategy backPressureWaitStrategy) {
+        this.queueName = queueName;
+        this.overflowLimit = overflowLimit;
+        this.recvQueue = new MpscArrayQueue<>(size);
+        this.overflowQ = new MpscUnboundedArrayQueue<>(size);
+
+        this.metrics = new JCQueue.QueueMetrics();
+
+        //The batch size can be no larger than half the full recvQueue size, to avoid contention issues.
+        this.producerBatchSz = Math.max(1, Math.min(producerBatchSz, size / 2));
+        this.backPressureWaitStrategy = backPressureWaitStrategy;
+    }
+
+    public String getName() {
+        return queueName;
+    }
+
+
+    public boolean haltWithInterrupt() {
+        boolean res = tryPublishInternal(INTERRUPT);
+        metrics.close();
+        return res;
+    }
+
+
+    /**
+     * Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q
+     */
+    public int consume(JCQueue.Consumer consumer) {
+        return consume(consumer, continueRunning);
+    }
+
+    /**
+     * Non blocking. Returns immediately if Q is empty. Runs till Q is empty OR exitCond.keepRunning() return false.
+     * Returns number of elements consumed from Q
+     */
+    public int consume(JCQueue.Consumer consumer, ExitCondition exitCond) {
+        try {
+            return consumeImpl(consumer, exitCond);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public int size() { return recvQueue.size() + overflowQ.size(); }
+
+    /**
+     * Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q
+     *  @param exitCond
+     */
+    private int consumeImpl(Consumer consumer, ExitCondition exitCond) throws InterruptedException {
+        int drainCount = 0;
+        while ( exitCond.keepRunning() ) {
+            Object tuple = recvQueue.poll();
+            if (tuple == null) {
+                break;
+            }
+            consumer.accept(tuple);
+            ++drainCount;
+        }
+
+        int overflowDrainCount = 0;
+        int limit = overflowQ.size();
+        while (exitCond.keepRunning()  &&  (overflowDrainCount < limit)) { // 2nd cond prevents staying stuck with consuming overflow
+            Object tuple = overflowQ.poll();
+            ++overflowDrainCount;
+            consumer.accept(tuple);
+        }
+        int total = drainCount + overflowDrainCount;
+        if (total > 0) {
+            consumer.flush();
+        }
+        return total;
+    }
+
+    // Non Blocking. returns true/false indicating success/failure. Fails if full.
+    private boolean tryPublishInternal(Object obj) {
+        if (recvQueue.offer(obj)) {
+            metrics.notifyArrivals(1);
+            return true;
+        }
+        return false;
+    }
+
+    // Non Blocking. returns count of how many inserts succeeded
+    private int tryPublishInternal(ArrayList<Object> objs) {
+        MessagePassingQueue.Supplier<Object> supplier =
+            new MessagePassingQueue.Supplier<Object>() {
+                int i = 0;
+
+                @Override
+                public Object get() {
+                    return objs.get(i++);
+                }
+            };
+        int count = recvQueue.fill(supplier, objs.size());
+        metrics.notifyArrivals(count);
+        return count;
+    }
+
+    private Inserter getInserter() {
+        Inserter inserter;
+        if (producerBatchSz > 1) {
+            inserter = thdLocalBatcher.get();
+            if (inserter == null) {
+                BatchInserter b = new BatchInserter(this, producerBatchSz);
+                inserter = b;
+                thdLocalBatcher.set(b);
+            }
+        } else {
+            inserter = directInserter;
+        }
+        return inserter;
+    }
+
+    /**
+     * Blocking call. Retries till it can successfully publish the obj. Can be interrupted via Thread.interrupt().
+     */
+    public void publish(Object obj) throws InterruptedException {
+        Inserter inserter = getInserter();
+        inserter.publish(obj);
+    }
+
+    /**
+     * Non-blocking call, returns false if full
+     **/
+    public boolean tryPublish(Object obj) {
+        Inserter inserter = getInserter();
+        return inserter.tryPublish(obj);
+    }
+
+    /** Non-blocking call. Bypasses any batching that may be enabled on the recvQueue. Intended for sending flush/metrics tuples */
+    public boolean tryPublishDirect(Object obj) {
+        return tryPublishInternal(obj);
+    }
+
+    /**
+     * Un-batched write to overflowQ. Should only be called by WorkerTransfer
+     * returns false if overflowLimit has reached
+     */
+    public boolean tryPublishToOverflow(Object obj) {
+        if (overflowLimit>0 && overflowQ.size() >= overflowLimit) {
+            return false;
+        }
+        overflowQ.add(obj);
+        return true;
+    }
+
+    public void recordMsgDrop() {
+        getMetrics().notifyDroppedMsg();
+    }
+
+    public boolean isEmptyOverflow() {
+        return overflowQ.isEmpty();
+    }
+
+    public int getOverflowCount() {
+        return overflowQ.size();
+    }
+
+    public int getQueuedCount() {
+        return recvQueue.size();
+    }
+
+    /**
+     * if(batchSz>1)  : Blocking call. Does not return until at least 1 element is drained or Thread.interrupt() is received
+     * if(batchSz==1) : NO-OP. Returns immediately. doesnt throw.
+     */
+    public void flush() throws InterruptedException {
+        Inserter inserter = getInserter();
+        inserter.flush();
+    }
+
+    /**
+     * if(batchSz>1)  : Non-Blocking call. Tries to flush as many as it can. Returns true if flushed at least 1.
+     * if(batchSz==1) : This is a NO-OP. Returns true immediately.
+     */
+    public boolean tryFlush()  {
+        Inserter inserter = getInserter();
+        return inserter.tryFlush();
+    }
+
+    @Override
+    public Object getState() {
+        return metrics.getState();
+    }
+
+
+    //This method enables the metrics to be accessed from outside of the JCQueue class
+    public JCQueue.QueueMetrics getMetrics() {
+        return metrics;
+    }
+
+    public interface Consumer extends org.jctools.queues.MessagePassingQueue.Consumer<Object> {
+        void accept(Object event);
+
+        void flush() throws InterruptedException;
+    }
+
+
+    public interface ExitCondition {
+        boolean keepRunning();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/utils/MutableLong.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/MutableLong.java b/storm-client/src/jvm/org/apache/storm/utils/MutableLong.java
index ab14c49..614f25c 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/MutableLong.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/MutableLong.java
@@ -33,7 +33,7 @@ public class MutableLong {
     }
     
     public long increment() {
-        return increment(1);
+        return ++val;
     }
     
     public long increment(long amt) {

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java b/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java
index f50947a..a1deb04 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java
@@ -76,6 +76,10 @@ public class ObjectReader {
         throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
     }
 
+    public static Long getLong(Object o) {
+        return getLong(o, null);
+    }
+
     public static Long getLong(Object o, Long defaultValue) {
         if (null == o) {
             return defaultValue;

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/utils/RotatingMap.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/RotatingMap.java b/storm-client/src/jvm/org/apache/storm/utils/RotatingMap.java
index 89dedd4..65f051e 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/RotatingMap.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/RotatingMap.java
@@ -109,7 +109,7 @@ public class RotatingMap<K, V> {
     }
     
     
-    public Object remove(K key) {
+    public V remove(K key) {
         for(HashMap<K, V> bucket: _buckets) {
             if(bucket.containsKey(key)) {
                 return bucket.remove(key);

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/utils/Time.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Time.java b/storm-client/src/jvm/org/apache/storm/utils/Time.java
index 142c432..80ecf5d 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Time.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Time.java
@@ -145,11 +145,15 @@ public class Time {
     }
 
     public static void sleep(long ms) throws InterruptedException {
-        sleepUntil(currentTimeMillis()+ms);
+        if(ms>0) {
+            sleepUntil(currentTimeMillis() + ms);
+        }
     }
     
     public static void sleepNanos(long nanos) throws InterruptedException {
-        sleepUntilNanos(nanoTime() + nanos);
+        if(nanos>0) {
+            sleepUntilNanos(nanoTime() + nanos);
+        }
     }
 
     public static void sleepSecs (long secs) throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java b/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
index f14136c..f1d30a5 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
@@ -6,135 +6,138 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.utils;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.storm.generated.NodeInfo;
 import org.apache.storm.messaging.IConnection;
 import org.apache.storm.messaging.TaskMessage;
-import com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TransferDrainer {
 
-  private Map<Integer, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
-  private static final Logger LOG = LoggerFactory.getLogger(TransferDrainer.class);
-  
-  public void add(HashMap<Integer, ArrayList<TaskMessage>> taskTupleSetMap) {
-    for (Map.Entry<Integer, ArrayList<TaskMessage>> entry : taskTupleSetMap.entrySet()) {
-      addListRefToMap(this.bundles, entry.getKey(), entry.getValue());
-    }
-  }
-  
-  public void send(Map<Integer, NodeInfo> taskToNode, Map<NodeInfo, IConnection> connections) {
-    HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> bundleMapByDestination = groupBundleByDestination(taskToNode);
-
-    for (Map.Entry<NodeInfo, ArrayList<ArrayList<TaskMessage>>> entry : bundleMapByDestination.entrySet()) {
-      NodeInfo hostPort = entry.getKey();
-      IConnection connection = connections.get(hostPort);
-      if (null != connection) {
-        ArrayList<ArrayList<TaskMessage>> bundle = entry.getValue();
-        Iterator<TaskMessage> iter = getBundleIterator(bundle);
-        if (null != iter && iter.hasNext()) {
-          connection.send(iter);
+    private static final Logger LOG = LoggerFactory.getLogger(TransferDrainer.class);
+    private Map<Integer, ArrayList<TaskMessage>> bundles = new HashMap();
+
+    // Cache the msgs grouped by destination node
+    public void add(TaskMessage taskMsg) {
+        int destId = taskMsg.task();
+        ArrayList<TaskMessage> msgs = bundles.get(destId);
+        if (msgs == null) {
+            msgs = new ArrayList<>();
+            bundles.put(destId, msgs);
         }
-      } else {
-        LOG.warn("Connection is not available for hostPort {}", hostPort);
-      }
+        msgs.add(taskMsg);
     }
-  }
-
-  private HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> groupBundleByDestination(Map<Integer, NodeInfo> taskToNode) {
-    HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> bundleMap = Maps.newHashMap();
-    for (Integer task : this.bundles.keySet()) {
-      NodeInfo hostPort = taskToNode.get(task);
-      if (hostPort != null) {
-        for (ArrayList<TaskMessage> chunk : this.bundles.get(task)) {
-          addListRefToMap(bundleMap, hostPort, chunk);
+
+    public void send(Map<Integer, NodeInfo> taskToNode, Map<NodeInfo, IConnection> connections) {
+        HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> bundleMapByDestination = groupBundleByDestination(taskToNode);
+
+        for (Map.Entry<NodeInfo, ArrayList<ArrayList<TaskMessage>>> entry : bundleMapByDestination.entrySet()) {
+            NodeInfo node = entry.getKey();
+            IConnection conn = connections.get(node);
+            if (conn != null) {
+                ArrayList<ArrayList<TaskMessage>> bundle = entry.getValue();
+                Iterator<TaskMessage> iter = getBundleIterator(bundle);
+                if (null != iter && iter.hasNext()) {
+                    conn.send(iter);
+                }
+                entry.getValue().clear();
+            } else {
+                LOG.warn("Connection not available for hostPort {}", node);
+            }
         }
-      } else {
-        LOG.warn("No remote destination available for task {}", task);
-      }
     }
-    return bundleMap;
-  }
 
-  private <T> void addListRefToMap(Map<T, ArrayList<ArrayList<TaskMessage>>> bundleMap,
-                                   T key, ArrayList<TaskMessage> tuples) {
-    ArrayList<ArrayList<TaskMessage>> bundle = bundleMap.get(key);
+    private HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> groupBundleByDestination(Map<Integer, NodeInfo> taskToNode) {
+        HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> result = new HashMap<>();
 
-    if (null == bundle) {
-      bundle = new ArrayList<ArrayList<TaskMessage>>();
-      bundleMap.put(key, bundle);
+        for (Entry<Integer, ArrayList<TaskMessage>> entry : bundles.entrySet()) {
+            if (entry.getValue().isEmpty()) {
+                continue;
+            }
+            NodeInfo node = taskToNode.get(entry.getKey());
+            if (node != null) {
+                ArrayList<ArrayList<TaskMessage>> msgs = result.get(node);
+                if (msgs == null) {
+                    msgs = new ArrayList<>();
+                    result.put(node, msgs);
+                }
+                msgs.add(entry.getValue());
+            } else {
+                LOG.warn("No remote destination available for task {}", entry.getKey());
+            }
+        }
+        return result;
     }
 
-    if (null != tuples && tuples.size() > 0) {
-      bundle.add(tuples);
-    }
-  }
+    private Iterator<TaskMessage> getBundleIterator(final ArrayList<ArrayList<TaskMessage>> bundle) {
 
-  private Iterator<TaskMessage> getBundleIterator(final ArrayList<ArrayList<TaskMessage>> bundle) {
-    
-    if (null == bundle) {
-      return null;
-    }
-    
-    return new Iterator<TaskMessage> () {
-      
-      private int offset = 0;
-      private int size = 0;
-      {
-        for (ArrayList<TaskMessage> list : bundle) {
-            size += list.size();
+        if (null == bundle) {
+            return null;
         }
-      }
-      
-      private int bundleOffset = 0;
-      private Iterator<TaskMessage> iter = bundle.get(bundleOffset).iterator();
-      
-      @Override
-      public boolean hasNext() {
-          return offset < size;
-      }
-
-      @Override
-      public TaskMessage next() {
-        TaskMessage msg;
-        if (iter.hasNext()) {
-          msg = iter.next(); 
-        } else {
-          bundleOffset++;
-          iter = bundle.get(bundleOffset).iterator();
-          msg = iter.next();
-        }
-        if (null != msg) {
-          offset++;
+
+        return new Iterator<TaskMessage>() {
+
+            private int offset = 0;
+            private int size = 0;
+            private int bundleOffset = 0;
+            private Iterator<TaskMessage> iter = bundle.get(bundleOffset).iterator();
+
+            {
+                for (ArrayList<TaskMessage> list : bundle) {
+                    size += list.size();
+                }
+            }
+
+            @Override
+            public boolean hasNext() {
+                return offset < size;
+            }
+
+            @Override
+            public TaskMessage next() {
+                TaskMessage msg;
+                if (iter.hasNext()) {
+                    msg = iter.next();
+                } else {
+                    bundleOffset++;
+                    iter = bundle.get(bundleOffset).iterator();
+                    msg = iter.next();
+                }
+                if (null != msg) {
+                    offset++;
+                }
+                return msg;
+            }
+
+            @Override
+            public void remove() {
+                throw new RuntimeException("not supported");
+            }
+        };
+    }
+
+
+    public void clear() {
+        for (ArrayList<TaskMessage> taskMessages : bundles.values()) {
+            taskMessages.clear();
         }
-        return msg;
-      }
-
-      @Override
-      public void remove() {
-        throw new RuntimeException("not supported");
-      }
-    };
-  }
-  
-  public void clear() {
-    bundles.clear();
-  }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index e20e2ee..26d267e 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -341,20 +341,22 @@ public class Utils {
      * @return the newly created thread
      * @see Thread
      */
-    public static SmartThread asyncLoop(final Callable afn,
-            boolean isDaemon, final Thread.UncaughtExceptionHandler eh,
-            int priority, final boolean isFactory, boolean startImmediately,
-            String threadName) {
+    public static SmartThread asyncLoop(final Callable afn, boolean isDaemon, final Thread.UncaughtExceptionHandler eh,
+                                        int priority, final boolean isFactory, boolean startImmediately,
+                                        String threadName) {
         SmartThread thread = new SmartThread(new Runnable() {
             public void run() {
-                Object s;
                 try {
-                    Callable fn = isFactory ? (Callable) afn.call() : afn;
-                    while ((s = fn.call()) instanceof Long) {
-                        Time.sleepSecs((Long) s);
+                    final Callable<Long> fn = isFactory ? (Callable<Long>) afn.call() : afn;
+                    while (true) {
+                        final Long s = fn.call();
+                        if (s==null) // then stop running it
+                            break;
+                        if (s>0)
+                            Time.sleep(s);
                     }
                 } catch (Throwable t) {
-                    if (exceptionCauseIsInstanceOf(
+                    if (Utils.exceptionCauseIsInstanceOf(
                             InterruptedException.class, t)) {
                         LOG.info("Async loop interrupted!");
                         return;
@@ -370,7 +372,7 @@ public class Utils {
             thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
                 public void uncaughtException(Thread t, Throwable e) {
                     LOG.error("Async loop died!", e);
-                    exitProcess(1, "Async loop died!");
+                    Utils.exitProcess(1, "Async loop died!");
                 }
             });
         }
@@ -1552,4 +1554,20 @@ public class Utils {
         }
         return ret;
     }
+
+    public static <V> ArrayList<V> convertToArray(Map<Integer, V> srcMap, int start) {
+        Set<Integer> ids = srcMap.keySet();
+        Integer largestId = ids.stream().max(Integer::compareTo).get();
+        int end = largestId - start;
+        ArrayList<V> result = new ArrayList<>(Collections.nCopies(end+1 , null)); // creates array[largestId+1] filled with nulls
+        for( Map.Entry<Integer, V> entry : srcMap.entrySet() ) {
+            int id = entry.getKey();
+            if (id < start) {
+                LOG.debug("Entry {} will be skipped it is too small {} ...", id, start);
+            } else {
+                result.set(id - start, entry.getValue());
+            }
+        }
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/utils/WorkerBackpressureCallback.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/WorkerBackpressureCallback.java b/storm-client/src/jvm/org/apache/storm/utils/WorkerBackpressureCallback.java
deleted file mode 100755
index 47c039a..0000000
--- a/storm-client/src/jvm/org/apache/storm/utils/WorkerBackpressureCallback.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.storm.utils;
-
-public interface WorkerBackpressureCallback {
-
-    void onEvent(Object obj);
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java b/storm-client/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
deleted file mode 100644
index 832448e..0000000
--- a/storm-client/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.utils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class WorkerBackpressureThread extends Thread {
-
-    private static final Logger LOG = LoggerFactory.getLogger(WorkerBackpressureThread.class);
-    private final Object trigger;
-    private final Object workerData;
-    private final WorkerBackpressureCallback callback;
-    private volatile boolean running = true;
-
-    public WorkerBackpressureThread(Object trigger, Object workerData, WorkerBackpressureCallback callback) {
-        this.trigger = trigger;
-        this.workerData = workerData;
-        this.callback = callback;
-        this.setName("WorkerBackpressureThread");
-        this.setDaemon(true);
-        this.setUncaughtExceptionHandler(new BackpressureUncaughtExceptionHandler());
-    }
-
-    static public void notifyBackpressureChecker(final Object trigger) {
-        try {
-            synchronized (trigger) {
-                trigger.notifyAll();
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void terminate() throws InterruptedException {
-        running = false;
-        interrupt();
-        join();
-    }
-
-    public void run() {
-        while (running) {
-            try {
-                synchronized (trigger) {
-                    trigger.wait(100);
-                }
-                callback.onEvent(workerData); // check all executors and update zk backpressure throttle for the worker if needed
-            } catch (InterruptedException interEx) {
-                // ignored, we are shutting down.
-            }
-        }
-    }
-}
-
-class BackpressureUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
-    private static final Logger LOG = LoggerFactory.getLogger(BackpressureUncaughtExceptionHandler.class);
-
-    @Override
-    public void uncaughtException(Thread t, Throwable e) {
-        // note that exception that happens during connecting to ZK has been ignored in the callback implementation
-        LOG.error("Received error or exception in WorkerBackpressureThread.. terminating the worker...", e);
-        Runtime.getRuntime().exit(1);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
index 7db9aa5..34adbc4 100644
--- a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
+++ b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
@@ -36,6 +36,7 @@ import java.util.Set;
 
 import org.apache.storm.Config;
 import org.apache.storm.utils.Utils;
+import org.apache.storm.validation.ConfigValidationAnnotations.ValidatorParams;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -96,6 +97,38 @@ public class ConfigValidation {
         }
     }
 
+    /**
+     * Checks if the named type derives from the specified Class
+     */
+    public static class DerivedTypeValidator extends Validator {
+
+        private Class<?> baseType;
+
+        public DerivedTypeValidator(Map<String, Object> params) {
+            this.baseType = (Class<?>) params.get(ValidatorParams.BASE_TYPE);
+        }
+
+        @Override
+        public void validateField(String name, Object actualTypeName) {
+            validateField(name, this.baseType, actualTypeName);
+        }
+
+        public static void validateField(String name, Class<?> baseType, Object actualTypeName) {
+            if (actualTypeName == null) {
+                return;
+            }
+            try {
+                Class<?> actualType = Class.forName(actualTypeName.toString());
+                if (baseType.isAssignableFrom(actualType)) {
+                    return;
+                }
+                throw new IllegalArgumentException("Field " + name + " must represent a type that derives from '" + baseType + "'. Specified type = " + actualTypeName);
+            } catch (ClassNotFoundException e) {
+                throw new IllegalArgumentException(e.getMessage());
+            }
+        }
+    }
+
     public static class StringValidator extends Validator {
 
         private HashSet<String> acceptedValues = null;

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java
index 56dc574..791c145 100644
--- a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java
+++ b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java
@@ -39,6 +39,7 @@ public class ConfigValidationAnnotations {
     public static class ValidatorParams {
         static final String VALIDATOR_CLASS = "validatorClass";
         static final String TYPE = "type";
+        static final String BASE_TYPE = "baseType";
         static final String ENTRY_VALIDATOR_CLASSES = "entryValidatorClasses";
         static final String KEY_VALIDATOR_CLASSES = "keyValidatorClasses";
         static final String VALUE_VALIDATOR_CLASSES = "valueValidatorClasses";
@@ -62,6 +63,14 @@ public class ConfigValidationAnnotations {
 
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.FIELD)
+    public @interface isDerivedFrom {
+        Class<?> validatorClass() default ConfigValidation.DerivedTypeValidator.class;
+
+        Class<?> baseType();
+    }
+
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.FIELD)
     public @interface isStringList {
         Class<?> validatorClass() default ConfigValidation.ListEntryTypeValidator.class;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/test/jvm/org/apache/storm/bolt/TestJoinBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/bolt/TestJoinBolt.java b/storm-client/test/jvm/org/apache/storm/bolt/TestJoinBolt.java
index 8f99f68..ff209bd 100644
--- a/storm-client/test/jvm/org/apache/storm/bolt/TestJoinBolt.java
+++ b/storm-client/test/jvm/org/apache/storm/bolt/TestJoinBolt.java
@@ -87,7 +87,7 @@ public class TestJoinBolt {
 
     @Test
     public void testTrivial() throws Exception {
-        ArrayList<Tuple> orderStream = makeStream("orders", orderFields, orders);
+        ArrayList<Tuple> orderStream = makeStream("orders", orderFields, orders, "ordersSpout");
         TupleWindow window = makeTupleWindow(orderStream);
 
         JoinBolt bolt = new JoinBolt(JoinBolt.Selector.STREAM, "orders", orderFields[0])
@@ -101,7 +101,7 @@ public class TestJoinBolt {
 
     @Test
     public void testNestedKeys() throws Exception {
-        ArrayList<Tuple> userStream = makeNestedEventsStream("users", userFields, users);
+        ArrayList<Tuple> userStream = makeNestedEventsStream("users", userFields, users, "usersSpout");
         TupleWindow window = makeTupleWindow(userStream);
         JoinBolt bolt = new JoinBolt(JoinBolt.Selector.STREAM, "users", "outer.userId")
                 .select("outer.name, outer.city");
@@ -115,8 +115,8 @@ public class TestJoinBolt {
 
     @Test
     public void testProjection_FieldsWithStreamName() throws Exception {
-        ArrayList<Tuple> userStream = makeStream("users", userFields, users);
-        ArrayList<Tuple> storeStream = makeStream("stores", storeFields, stores);
+        ArrayList<Tuple> userStream = makeStream("users", userFields, users, "usersSpout");
+        ArrayList<Tuple> storeStream = makeStream("stores", storeFields, stores, "storesSpout");
 
         TupleWindow window = makeTupleWindow(storeStream, userStream);
 
@@ -141,8 +141,8 @@ public class TestJoinBolt {
 
     @Test
     public void testInnerJoin() throws Exception {
-        ArrayList<Tuple> userStream = makeStream("users", userFields, users);
-        ArrayList<Tuple> orderStream = makeStream("orders", orderFields, orders);
+        ArrayList<Tuple> userStream = makeStream("users", userFields, users, "usersSpout");
+        ArrayList<Tuple> orderStream = makeStream("orders", orderFields, orders, "ordersSpout");
         TupleWindow window = makeTupleWindow(orderStream, userStream);
 
         JoinBolt bolt = new JoinBolt(JoinBolt.Selector.STREAM, "users", userFields[0])
@@ -158,8 +158,8 @@ public class TestJoinBolt {
 
     @Test
     public void testLeftJoin() throws Exception {
-        ArrayList<Tuple> userStream = makeStream("users", userFields, users);
-        ArrayList<Tuple> orderStream = makeStream("orders", orderFields, orders);
+        ArrayList<Tuple> userStream = makeStream("users", userFields, users, "usersSpout");
+        ArrayList<Tuple> orderStream = makeStream("orders", orderFields, orders, "ordersSpout");
         TupleWindow window = makeTupleWindow(orderStream, userStream);
 
         JoinBolt bolt = new JoinBolt(JoinBolt.Selector.STREAM, "users", userFields[0])
@@ -175,9 +175,9 @@ public class TestJoinBolt {
 
     @Test
     public void testThreeStreamInnerJoin() throws Exception {
-        ArrayList<Tuple> userStream = makeStream("users", userFields, users);
-        ArrayList<Tuple> storesStream = makeStream("stores", storeFields, stores);
-        ArrayList<Tuple> cityStream = makeStream("cities", cityFields, cities);
+        ArrayList<Tuple> userStream = makeStream("users", userFields, users, "usersSpout");
+        ArrayList<Tuple> storesStream = makeStream("stores", storeFields, stores, "storesSpout");
+        ArrayList<Tuple> cityStream = makeStream("cities", cityFields, cities, "citiesSpout");
 
         TupleWindow window = makeTupleWindow(userStream, storesStream, cityStream);
 
@@ -196,9 +196,9 @@ public class TestJoinBolt {
 
     @Test
     public void testThreeStreamLeftJoin_1() throws Exception {
-        ArrayList<Tuple> userStream = makeStream("users", userFields, users);
-        ArrayList<Tuple> storesStream = makeStream("stores", storeFields, stores);
-        ArrayList<Tuple> cityStream = makeStream("cities", cityFields, cities);
+        ArrayList<Tuple> userStream = makeStream("users", userFields, users, "usersSpout");
+        ArrayList<Tuple> storesStream = makeStream("stores", storeFields, stores, "storesSpout");
+        ArrayList<Tuple> cityStream = makeStream("cities", cityFields, cities, "citiesSpout");
 
         TupleWindow window = makeTupleWindow(userStream,  cityStream, storesStream);
 
@@ -216,9 +216,9 @@ public class TestJoinBolt {
 
     @Test
     public void testThreeStreamLeftJoin_2() throws Exception {
-        ArrayList<Tuple> userStream = makeStream("users", userFields, users);
-        ArrayList<Tuple> storesStream = makeStream("stores", storeFields, stores);
-        ArrayList<Tuple> cityStream = makeStream("cities", cityFields, cities);
+        ArrayList<Tuple> userStream = makeStream("users", userFields, users, "usersSpout");
+        ArrayList<Tuple> storesStream = makeStream("stores", storeFields, stores, "storesSpout");
+        ArrayList<Tuple> cityStream = makeStream("cities", cityFields, cities, "citiesSpout");
 
         TupleWindow window = makeTupleWindow(userStream, cityStream, storesStream);
 
@@ -238,9 +238,9 @@ public class TestJoinBolt {
 
     @Test
     public void testThreeStreamMixedJoin() throws Exception {
-        ArrayList<Tuple> userStream = makeStream("users", userFields, users);
-        ArrayList<Tuple> storesStream = makeStream("stores", storeFields, stores);
-        ArrayList<Tuple> cityStream = makeStream("cities", cityFields, cities);
+        ArrayList<Tuple> userStream = makeStream("users", userFields, users, "usersSpout");
+        ArrayList<Tuple> storesStream = makeStream("stores", storeFields, stores, "storesSpout");
+        ArrayList<Tuple> cityStream = makeStream("cities", cityFields, cities, "citiesSpout");
 
         TupleWindow window = makeTupleWindow(userStream,  cityStream, storesStream);
 
@@ -286,12 +286,12 @@ public class TestJoinBolt {
     }
 
 
-    private static ArrayList<Tuple> makeStream(String streamName, String[] fieldNames, Object[][] data) {
+    private static ArrayList<Tuple> makeStream(String streamName, String[] fieldNames, Object[][] data, String srcComponentName) {
         ArrayList<Tuple> result = new ArrayList<>();
         MockContext mockContext = new MockContext(fieldNames);
 
         for (Object[] record : data) {
-            TupleImpl rec = new TupleImpl(mockContext, Arrays.asList(record), 0, streamName);
+            TupleImpl rec = new TupleImpl(mockContext, Arrays.asList(record), srcComponentName, 0, streamName);
             result.add( rec );
         }
 
@@ -299,7 +299,8 @@ public class TestJoinBolt {
     }
 
 
-    private static ArrayList<Tuple> makeNestedEventsStream (String streamName, String[] fieldNames, Object[][] records) {
+    private static ArrayList<Tuple> makeNestedEventsStream (String streamName, String[] fieldNames, Object[][] records
+        , String srcComponentName) {
 
         MockContext mockContext = new MockContext(new String[]{"outer"} );
         ArrayList<Tuple> result = new ArrayList<>(records.length);
@@ -313,7 +314,7 @@ public class TestJoinBolt {
 
             ArrayList<Object> tupleValues = new ArrayList<>(1);
             tupleValues.add(recordMap);
-            TupleImpl tuple = new TupleImpl(mockContext, tupleValues, 0, streamName);
+            TupleImpl tuple = new TupleImpl(mockContext, tupleValues, srcComponentName, 0, streamName);
             result.add( tuple );
         }
 
@@ -341,7 +342,7 @@ public class TestJoinBolt {
         private final Fields fields;
 
         public MockContext(String[] fieldNames) {
-            super(null, null, null, null, null, null);
+            super(null, new HashMap<>(), null, null, null, null);
             this.fields = new Fields(fieldNames);
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java b/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java
index bd20d7b..e2ef41e 100644
--- a/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java
+++ b/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java
@@ -38,8 +38,7 @@ public class StormClusterStateImplTest {
                                         ClusterUtils.ERRORS_SUBTREE, 
                                         ClusterUtils.BLOBSTORE_SUBTREE, 
                                         ClusterUtils.NIMBUSES_SUBTREE, 
-                                        ClusterUtils.LOGCONFIG_SUBTREE,
-                                        ClusterUtils.BACKPRESSURE_SUBTREE };
+                                        ClusterUtils.LOGCONFIG_SUBTREE};
 
     private IStateStorage storage;
     private ClusterStateContext context;
@@ -64,45 +63,5 @@ public class StormClusterStateImplTest {
             Mockito.verify(storage).mkdirs(path, null);
         }
     }
-
-    @Test
-    public void removeBackpressureDoesNotThrowTest() {
-        // setup to throw
-        Mockito.doThrow(new RuntimeException(new KeeperException.NoNodeException("foo")))
-               .when(storage)
-               .delete_node(Matchers.anyString());
-        try {
-            state.removeBackpressure("bogus-topo-id");
-            // teardown backpressure should have caught the exception
-            Mockito.verify(storage)
-                   .delete_node(ClusterUtils.backpressureStormRoot("bogus-topo-id"));
-        } catch (Exception e) {
-            Assert.fail("Exception thrown when it shouldn't have: " + e);
-        }
-    }
-
-    @Test
-    public void removeWorkerBackpressureDoesntAttemptForNonExistentZNodeTest() {
-        // setup to throw
-        Mockito.when(storage.node_exists(Matchers.anyString(), Matchers.anyBoolean()))
-               .thenReturn(false);
-
-        state.removeWorkerBackpressure("bogus-topo-id", "bogus-host", new Long(1234));
-
-        Mockito.verify(storage, Mockito.never())
-               .delete_node(Matchers.anyString());
-    }
-
-    @Test
-    public void removeWorkerBackpressureCleansForExistingZNodeTest() {
-        // setup to throw
-        Mockito.when(storage.node_exists(Matchers.anyString(), Matchers.anyBoolean()))
-               .thenReturn(true);
-
-        state.removeWorkerBackpressure("bogus-topo-id", "bogus-host", new Long(1234));
-
-        Mockito.verify(storage)
-               .delete_node(ClusterUtils.backpressurePath("bogus-topo-id", "bogus-host", new Long(1234)));
-    }
 }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java b/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
index 76fdf33..7d2d5d2 100644
--- a/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
+++ b/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
@@ -51,7 +51,7 @@ import static org.junit.Assert.fail;
  * Unit tests for {@link WindowedBoltExecutor}
  */
 public class WindowedBoltExecutorTest {
-    
+
     private WindowedBoltExecutor executor;
     private TestWindowedBolt testWindowedBolt;
 
@@ -77,8 +77,8 @@ public class WindowedBoltExecutorTest {
         };
     }
 
-    private Tuple getTuple(String streamId, final Fields fields, Values values) {
-        return new TupleImpl(getContext(fields), values, 1, streamId) {
+    private Tuple getTuple(String streamId, final Fields fields, Values values, String srcComponent) {
+        return new TupleImpl(getContext(fields), values, srcComponent, 1, streamId) {
             @Override
             public GlobalStreamId getSourceGlobalStreamId() {
                 return new GlobalStreamId("s1", "default");
@@ -118,14 +118,14 @@ public class WindowedBoltExecutorTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void testExecuteWithoutTs() throws Exception {
-        executor.execute(getTuple("s1", new Fields("a"), new Values(1)));
+        executor.execute(getTuple("s1", new Fields("a"), new Values(1), "s1Src"));
     }
 
     @Test
     public void testExecuteWithTs() throws Exception {
         long[] timestamps = {603, 605, 607, 618, 626, 636};
         for (long ts : timestamps) {
-            executor.execute(getTuple("s1", new Fields("ts"), new Values(ts)));
+            executor.execute(getTuple("s1", new Fields("ts"), new Values(ts), "s1Src"));
         }
         //Thread.sleep(120);
         executor.waterMarkEventGenerator.run();
@@ -216,13 +216,13 @@ public class WindowedBoltExecutorTest {
         List<Tuple> tuples = new ArrayList<>(timestamps.length);
 
         for (long ts : timestamps) {
-            Tuple tuple = getTuple("s1", new Fields("ts"), new Values(ts));
+            Tuple tuple = getTuple("s1", new Fields("ts"), new Values(ts), "s1Src");
             tuples.add(tuple);
             executor.execute(tuple);
-            
+
             //Update the watermark to this timestamp
             executor.waterMarkEventGenerator.run();
-        } 
+        }
         System.out.println(testWindowedBolt.tupleWindows);
         Tuple tuple = tuples.get(tuples.size() - 1);
         Mockito.verify(outputCollector).emit("$late", Arrays.asList(tuple), new Values(tuple));

http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java b/storm-client/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
deleted file mode 100644
index 7072e55..0000000
--- a/storm-client/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.utils;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.dsl.ProducerType;
-import org.junit.Assert;
-import org.junit.Test;
-import junit.framework.TestCase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DisruptorQueueBackpressureTest extends TestCase {
-    private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueueBackpressureTest.class);
-
-    private final static int MESSAGES = 100;
-    private final static int CAPACITY = 128;
-    private final static double HIGH_WATERMARK = 0.6;
-    private final static double LOW_WATERMARK = 0.2;
-
-    @Test
-    public void testBackPressureCallback() throws Exception {
-
-        final DisruptorQueue queue = createQueue("testBackPressure", CAPACITY);
-        queue.setEnableBackpressure(true);
-        queue.setHighWaterMark(HIGH_WATERMARK);
-        queue.setLowWaterMark(LOW_WATERMARK);
-
-        final AtomicBoolean throttleOn = new AtomicBoolean(false);
-        // we need to record the cursor because the DisruptorQueue does not update the readPos during batch consuming
-        final AtomicLong consumerCursor = new AtomicLong(-1);
-
-        DisruptorBackpressureCallbackImpl cb = new DisruptorBackpressureCallbackImpl(queue, throttleOn, consumerCursor);
-        queue.registerBackpressureCallback(cb);
-
-        for (int i = 0; i < MESSAGES; i++) {
-            queue.publish(String.valueOf(i));
-        }
-
-
-        queue.consumeBatchWhenAvailable(new EventHandler<Object>() {
-            @Override
-            public void onEvent(Object o, long l, boolean b) throws Exception {
-                 consumerCursor.set(l);
-            }
-        });
-
-
-        Assert.assertEquals("Check the calling time of throttle on. ",
-                queue.getHighWaterMark(), cb.highWaterMarkCalledPopulation);
-        Assert.assertEquals("Checking the calling time of throttle off. ",
-                queue.getLowWaterMark(), cb.lowWaterMarkCalledPopulation);
-    }
-
-    class DisruptorBackpressureCallbackImpl implements DisruptorBackpressureCallback {
-        // the queue's population when the high water mark callback is called for the first time
-        public long highWaterMarkCalledPopulation = -1;
-        // the queue's population when the low water mark callback is called for the first time
-        public long lowWaterMarkCalledPopulation = -1;
-
-        DisruptorQueue queue;
-        AtomicBoolean throttleOn;
-        AtomicLong consumerCursor;
-
-        public DisruptorBackpressureCallbackImpl(DisruptorQueue queue, AtomicBoolean throttleOn,
-                                                 AtomicLong consumerCursor) {
-            this.queue = queue;
-            this.throttleOn = throttleOn;
-            this.consumerCursor = consumerCursor;
-        }
-
-        @Override
-        public void highWaterMark() throws Exception {
-            if (!throttleOn.get()) {
-                highWaterMarkCalledPopulation = queue.getMetrics().population() + queue.getMetrics().overflow();
-                throttleOn.set(true);
-            }
-        }
-
-        @Override
-        public void lowWaterMark() throws Exception {
-             if (throttleOn.get()) {
-                 lowWaterMarkCalledPopulation = queue.getMetrics().writePos() - consumerCursor.get() + queue.getMetrics().overflow();
-                 throttleOn.set(false);
-             }
-        }
-    }
-
-    private static DisruptorQueue createQueue(String name, int queueSize) {
-        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L);
-    }
-}