You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/02/14 14:07:37 UTC
[2/9] storm git commit: STORM-2306 - Messaging subsystem redesign.
New Backpressure model.
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-client/src/jvm/org/apache/storm/utils/DisruptorQueue.java
deleted file mode 100644
index 3779505..0000000
--- a/storm-client/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ /dev/null
@@ -1,619 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.utils;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.lmax.disruptor.AlertException;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.InsufficientCapacityException;
-import com.lmax.disruptor.LiteBlockingWaitStrategy;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.Sequence;
-import com.lmax.disruptor.SequenceBarrier;
-import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
-import com.lmax.disruptor.TimeoutException;
-import com.lmax.disruptor.WaitStrategy;
-import com.lmax.disruptor.dsl.ProducerType;
-
-import org.apache.storm.Config;
-import org.apache.storm.metric.api.IStatefulObject;
-import org.apache.storm.metric.internal.RateTracker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * A single consumer queue that uses the LMAX Disruptor. They key to the performance is
- * the ability to catch up to the producer by processing tuples in batches.
- */
-public class DisruptorQueue implements IStatefulObject {
- private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueue.class);
- private static final Object INTERRUPT = new Object();
- private static final String PREFIX = "disruptor-";
- private static final FlusherPool FLUSHER = new FlusherPool();
-
- private static int getNumFlusherPoolThreads() {
- int numThreads = 100;
- try {
- Map<String, Object> conf = Utils.readStormConfig();
- numThreads = ObjectReader.getInt(conf.get(Config.STORM_WORKER_DISRUPTOR_FLUSHER_MAX_POOL_SIZE), numThreads);
- } catch (Exception e) {
- LOG.warn("Error while trying to read system config", e);
- }
- try {
- String threads = System.getProperty("num_flusher_pool_threads", String.valueOf(numThreads));
- numThreads = Integer.parseInt(threads);
- } catch (Exception e) {
- LOG.warn("Error while parsing number of flusher pool threads", e);
- }
- LOG.debug("Reading num_flusher_pool_threads Flusher pool threads: {}", numThreads);
- return numThreads;
- }
-
- private static class FlusherPool {
- private static final String THREAD_PREFIX = "disruptor-flush";
- private Timer _timer = new Timer(THREAD_PREFIX + "-trigger", true);
- private ThreadPoolExecutor _exec;
- private HashMap<Long, ArrayList<Flusher>> _pendingFlush = new HashMap<>();
- private HashMap<Long, TimerTask> _tt = new HashMap<>();
-
- public FlusherPool() {
- _exec = new ThreadPoolExecutor(1, getNumFlusherPoolThreads(), 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024), new ThreadPoolExecutor.DiscardPolicy());
- ThreadFactory threadFactory = new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat(THREAD_PREFIX + "-task-pool")
- .build();
- _exec.setThreadFactory(threadFactory);
- }
-
- public synchronized void start(Flusher flusher, final long flushInterval) {
- ArrayList<Flusher> pending = _pendingFlush.get(flushInterval);
- if (pending == null) {
- pending = new ArrayList<>();
- TimerTask t = new TimerTask() {
- @Override
- public void run() {
- invokeAll(flushInterval);
- }
- };
- _pendingFlush.put(flushInterval, pending);
- _timer.schedule(t, flushInterval, flushInterval);
- _tt.put(flushInterval, t);
- }
- pending.add(flusher);
- }
-
- private synchronized void invokeAll(long flushInterval) {
- ArrayList<Flusher> tasks = _pendingFlush.get(flushInterval);
- if (tasks != null) {
- for (Flusher f: tasks) {
- _exec.submit(f);
- }
- }
- }
-
- public synchronized void stop(Flusher flusher, long flushInterval) {
- ArrayList<Flusher> pending = _pendingFlush.get(flushInterval);
- if (pending != null) {
- pending.remove(flusher);
- if (pending.size() == 0) {
- _pendingFlush.remove(flushInterval);
- _tt.remove(flushInterval).cancel();
- }
- }
- }
- }
-
- private static class ObjectEventFactory implements EventFactory<AtomicReference<Object>> {
- @Override
- public AtomicReference<Object> newInstance() {
- return new AtomicReference<Object>();
- }
- }
-
- private interface ThreadLocalInserter {
- public void add(Object obj);
- public void forceBatch();
- public void flush(boolean block);
- }
-
- private class ThreadLocalJustInserter implements ThreadLocalInserter {
- private final ReentrantLock _flushLock;
- private final ConcurrentLinkedQueue<Object> _overflow;
-
- public ThreadLocalJustInserter() {
- _flushLock = new ReentrantLock();
- _overflow = new ConcurrentLinkedQueue<>();
- }
-
- //called by the main thread and should not block for an undefined period of time
- public synchronized void add(Object obj) {
- boolean inserted = false;
- if (_overflow.isEmpty()) {
- try {
- publishDirectSingle(obj, false);
- inserted = true;
- } catch (InsufficientCapacityException e) {
- //Ignored
- }
- }
-
- if (!inserted) {
- _overflowCount.incrementAndGet();
- _overflow.add(obj);
- }
-
- if (_enableBackpressure && _cb != null && (_metrics.population() + _overflowCount.get()) >= _highWaterMark) {
- try {
- if (!_throttleOn) {
- _throttleOn = true;
- _cb.highWaterMark();
- }
- } catch (Exception e) {
- throw new RuntimeException("Exception during calling highWaterMark callback!", e);
- }
- }
- }
-
- //May be called by a background thread
- public void forceBatch() {
- //NOOP
- }
-
- //May be called by a background thread
- public void flush(boolean block) {
- if (block) {
- _flushLock.lock();
- } else if (!_flushLock.tryLock()) {
- //Someone else if flushing so don't do anything
- return;
- }
- try {
- while (!_overflow.isEmpty()) {
- publishDirectSingle(_overflow.peek(), block);
- _overflowCount.addAndGet(-1);
- _overflow.poll();
- }
- } catch (InsufficientCapacityException e) {
- //Ignored we should not block
- } finally {
- _flushLock.unlock();
- }
- }
- }
-
- private class ThreadLocalBatcher implements ThreadLocalInserter {
- private final ReentrantLock _flushLock;
- private final ConcurrentLinkedQueue<ArrayList<Object>> _overflow;
- private ArrayList<Object> _currentBatch;
-
- public ThreadLocalBatcher() {
- _flushLock = new ReentrantLock();
- _overflow = new ConcurrentLinkedQueue<ArrayList<Object>>();
- _currentBatch = new ArrayList<Object>(_inputBatchSize);
- }
-
- //called by the main thread and should not block for an undefined period of time
- public synchronized void add(Object obj) {
- _currentBatch.add(obj);
- _overflowCount.incrementAndGet();
- if (_enableBackpressure && _cb != null && (_metrics.population() + _overflowCount.get()) >= _highWaterMark) {
- try {
- if (!_throttleOn) {
- _throttleOn = true;
- _cb.highWaterMark();
- }
- } catch (Exception e) {
- throw new RuntimeException("Exception during calling highWaterMark callback!", e);
- }
- }
- if (_currentBatch.size() >= _inputBatchSize) {
- boolean flushed = false;
- if (_overflow.isEmpty()) {
- try {
- publishDirect(_currentBatch, false);
- _overflowCount.addAndGet(0 - _currentBatch.size());
- _currentBatch.clear();
- flushed = true;
- } catch (InsufficientCapacityException e) {
- //Ignored we will flush later
- }
- }
-
- if (!flushed) {
- _overflow.add(_currentBatch);
- _currentBatch = new ArrayList<Object>(_inputBatchSize);
- }
- }
- }
-
- //May be called by a background thread
- public synchronized void forceBatch() {
- if (!_currentBatch.isEmpty()) {
- _overflow.add(_currentBatch);
- _currentBatch = new ArrayList<Object>(_inputBatchSize);
- }
- }
-
- //May be called by a background thread
- public void flush(boolean block) {
- if (block) {
- _flushLock.lock();
- } else if (!_flushLock.tryLock()) {
- //Someone else if flushing so don't do anything
- return;
- }
- try {
- while (!_overflow.isEmpty()) {
- publishDirect(_overflow.peek(), block);
- _overflowCount.addAndGet(0 - _overflow.poll().size());
- }
- } catch (InsufficientCapacityException e) {
- //Ignored we should not block
- } finally {
- _flushLock.unlock();
- }
- }
- }
-
- private class Flusher implements Runnable {
- private AtomicBoolean _isFlushing = new AtomicBoolean(false);
- private final long _flushInterval;
-
- public Flusher(long flushInterval, String name) {
- _flushInterval = flushInterval;
- }
-
- public void run() {
- if (_isFlushing.compareAndSet(false, true)) {
- for (ThreadLocalInserter batcher: _batchers.values()) {
- batcher.forceBatch();
- batcher.flush(true);
- }
- _isFlushing.set(false);
- }
- }
-
- public void start() {
- FLUSHER.start(this, _flushInterval);
- }
-
- public void close() {
- FLUSHER.stop(this, _flushInterval);
- }
- }
-
- /**
- * This inner class provides methods to access the metrics of the disruptor queue.
- */
- public class QueueMetrics {
- private final RateTracker _rateTracker = new RateTracker(10000, 10);
-
- public long writePos() {
- return _buffer.getCursor();
- }
-
- public long readPos() {
- return _consumer.get();
- }
-
- public long overflow() {
- return _overflowCount.get();
- }
-
- public long population() {
- return writePos() - readPos();
- }
-
- public long capacity() {
- return _buffer.getBufferSize();
- }
-
- public float pctFull() {
- return (1.0F * population() / capacity());
- }
-
- public Object getState() {
- Map state = new HashMap<String, Object>();
-
- // get readPos then writePos so it's never an under-estimate
- long rp = readPos();
- long wp = writePos();
-
- final long tuplePop = tuplePopulation.get();
-
- final double arrivalRateInSecs = _rateTracker.reportRate();
-
- //Assume the queue is stable, in which the arrival rate is equal to the consumption rate.
- // If this assumption does not hold, the calculation of sojourn time should also consider
- // departure rate according to Queuing Theory.
- final double sojournTime = tuplePop / Math.max(arrivalRateInSecs, 0.00001) * 1000.0;
-
- state.put("capacity", capacity());
- state.put("population", wp - rp);
- state.put("tuple_population", tuplePop);
- state.put("write_pos", wp);
- state.put("read_pos", rp);
- state.put("arrival_rate_secs", arrivalRateInSecs);
- state.put("sojourn_time_ms", sojournTime); //element sojourn time in milliseconds
- state.put("overflow", _overflowCount.get());
-
- return state;
- }
-
- public void notifyArrivals(long counts) {
- _rateTracker.notify(counts);
- tuplePopulation.getAndAdd(counts);
- }
-
- public void notifyDepartures(long counts) {
- tuplePopulation.getAndAdd(-counts);
- }
-
- public void close() {
- _rateTracker.close();
- }
- }
-
- private final RingBuffer<AtomicReference<Object>> _buffer;
- private final Sequence _consumer;
- private final SequenceBarrier _barrier;
- private final int _inputBatchSize;
- private final ConcurrentHashMap<Long, ThreadLocalInserter> _batchers = new ConcurrentHashMap<Long, ThreadLocalInserter>();
- private final Flusher _flusher;
- private final QueueMetrics _metrics;
-
- private String _queueName = "";
- private DisruptorBackpressureCallback _cb = null;
- private int _highWaterMark = 0;
- private int _lowWaterMark = 0;
- private boolean _enableBackpressure = false;
- private final AtomicLong _overflowCount = new AtomicLong(0);
- private final AtomicLong tuplePopulation = new AtomicLong(0);
- private volatile boolean _throttleOn = false;
-
- public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval) {
- this._queueName = PREFIX + queueName;
- WaitStrategy wait;
- if (readTimeout <= 0) {
- wait = new LiteBlockingWaitStrategy();
- } else {
- wait = new TimeoutBlockingWaitStrategy(readTimeout, TimeUnit.MILLISECONDS);
- }
-
- _buffer = RingBuffer.create(type, new ObjectEventFactory(), size, wait);
- _consumer = new Sequence();
- _barrier = _buffer.newBarrier();
- _buffer.addGatingSequences(_consumer);
- _metrics = new QueueMetrics();
- //The batch size can be no larger than half the full queue size.
- //This is mostly to avoid contention issues.
- _inputBatchSize = Math.max(1, Math.min(inputBatchSize, size/2));
-
- _flusher = new Flusher(Math.max(flushInterval, 1), _queueName);
- _flusher.start();
- }
-
- public DisruptorQueue(String queueName, int size, long readTimeout, int inputBatchSize, long flushInterval) {
- this(queueName, ProducerType.MULTI, size, readTimeout, inputBatchSize, flushInterval);
- }
-
- public String getName() {
- return _queueName;
- }
-
- public boolean isFull() {
- return (_metrics.population() + _overflowCount.get()) >= _metrics.capacity();
- }
-
- public void haltWithInterrupt() {
- try {
- publishDirect(new ArrayList<Object>(Arrays.asList(INTERRUPT)), true);
- _flusher.close();
- _metrics.close();
- } catch (InsufficientCapacityException e) {
- //This should be impossible
- throw new RuntimeException(e);
- }
- }
-
- public void consumeBatch(EventHandler<Object> handler) {
- if (_metrics.population() > 0) {
- consumeBatchWhenAvailable(handler);
- }
- }
-
- public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
- try {
- final long nextSequence = _consumer.get() + 1;
- long availableSequence = _barrier.waitFor(nextSequence);
-
- if (availableSequence >= nextSequence) {
- consumeBatchToCursor(availableSequence, handler);
- }
- } catch (TimeoutException te) {
- //Ignored
- } catch (AlertException e) {
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- private void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
- for (long curr = _consumer.get() + 1; curr <= cursor; curr++) {
- try {
- AtomicReference<Object> mo = _buffer.get(curr);
- Object o = mo.getAndSet(null);
- if (o == INTERRUPT) {
- throw new InterruptedException("Disruptor processing interrupted");
- } else if (o == null) {
- LOG.error("NULL found in {}:{}", this.getName(), cursor);
- } else {
- _metrics.notifyDepartures(getTupleCount(o));
- handler.onEvent(o, curr, curr == cursor);
- if (_enableBackpressure && _cb != null && (_metrics.writePos() - curr + _overflowCount.get()) <= _lowWaterMark) {
- try {
- if (_throttleOn) {
- _throttleOn = false;
- _cb.lowWaterMark();
- }
- } catch (Exception e) {
- throw new RuntimeException("Exception during calling lowWaterMark callback!");
- }
- }
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- _consumer.set(cursor);
- }
-
- public void registerBackpressureCallback(DisruptorBackpressureCallback cb) {
- this._cb = cb;
- }
-
- private static Long getId() {
- return Thread.currentThread().getId();
- }
-
- private long getTupleCount(Object obj) {
- //a published object could be an instance of either AddressedTuple, ArrayList<AddressedTuple>, or HashMap<Integer, ArrayList<TaskMessage>>.
- long tupleCount;
- if (obj instanceof ArrayList) {
- tupleCount = ((ArrayList) obj).size();
- } else if (obj instanceof HashMap) {
- tupleCount = 0;
- for (Object value:((HashMap) obj).values()) {
- tupleCount += ((ArrayList) value).size();
- }
- } else {
- tupleCount = 1;
- }
- return tupleCount;
- }
-
- private void publishDirectSingle(Object obj, boolean block) throws InsufficientCapacityException {
- long at;
- long numberOfTuples;
- if (block) {
- at = _buffer.next();
- } else {
- at = _buffer.tryNext();
- }
- AtomicReference<Object> m = _buffer.get(at);
- m.set(obj);
- _buffer.publish(at);
- numberOfTuples = getTupleCount(obj);
- _metrics.notifyArrivals(numberOfTuples);
- }
-
- private void publishDirect(ArrayList<Object> objs, boolean block) throws InsufficientCapacityException {
- int size = objs.size();
- if (size > 0) {
- long end;
- if (block) {
- end = _buffer.next(size);
- } else {
- end = _buffer.tryNext(size);
- }
- long begin = end - (size - 1);
- long at = begin;
- long numberOfTuples = 0;
- for (Object obj: objs) {
- AtomicReference<Object> m = _buffer.get(at);
- m.set(obj);
- at++;
- numberOfTuples += getTupleCount(obj);
- }
- _metrics.notifyArrivals(numberOfTuples);
- _buffer.publish(begin, end);
- }
- }
-
- public void publish(Object obj) {
- Long id = getId();
- ThreadLocalInserter batcher = _batchers.get(id);
- if (batcher == null) {
- //This thread is the only one ever creating this, so this is safe
- if (_inputBatchSize > 1) {
- batcher = new ThreadLocalBatcher();
- } else {
- batcher = new ThreadLocalJustInserter();
- }
- _batchers.put(id, batcher);
- }
- batcher.add(obj);
- batcher.flush(false);
- }
-
- @Override
- public Object getState() {
- return _metrics.getState();
- }
-
- public DisruptorQueue setHighWaterMark(double highWaterMark) {
- this._highWaterMark = (int)(_metrics.capacity() * highWaterMark);
- return this;
- }
-
- public DisruptorQueue setLowWaterMark(double lowWaterMark) {
- this._lowWaterMark = (int)(_metrics.capacity() * lowWaterMark);
- return this;
- }
-
- public int getHighWaterMark() {
- return this._highWaterMark;
- }
-
- public int getLowWaterMark() {
- return this._lowWaterMark;
- }
-
- public DisruptorQueue setEnableBackpressure(boolean enableBackpressure) {
- this._enableBackpressure = enableBackpressure;
- return this;
- }
-
- //This method enables the metrics to be accessed from outside of the DisruptorQueue class
- public QueueMetrics getMetrics() {
- return _metrics;
- }
-
- public boolean getThrottleOn() {
- return _throttleOn;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
new file mode 100644
index 0000000..26b0ac4
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.utils;
+
+import org.apache.storm.policy.IWaitStrategy;
+import org.apache.storm.metric.api.IStatefulObject;
+import org.apache.storm.metric.internal.RateTracker;
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.MpscArrayQueue;
+import org.jctools.queues.MpscUnboundedArrayQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class JCQueue implements IStatefulObject {
+ private static final Logger LOG = LoggerFactory.getLogger(JCQueue.class);
+
+ public static final Object INTERRUPT = new Object();
+
+ private final ExitCondition continueRunning = () -> true;
+
+ private interface Inserter {
+ // blocking call that can be interrupted using Thread.interrupt()
+ void publish(Object obj) throws InterruptedException;
+ boolean tryPublish(Object obj);
+
+ void flush() throws InterruptedException;
+ boolean tryFlush();
+ }
+
+ /* Thread safe. Same instance can be used across multiple threads */
+ private static class DirectInserter implements Inserter {
+ private JCQueue q;
+
+ public DirectInserter(JCQueue q) {
+ this.q = q;
+ }
+
+ /** Blocking call, that can be interrupted via Thread.interrupt */
+ @Override
+ public void publish(Object obj) throws InterruptedException {
+ boolean inserted = q.tryPublishInternal(obj);
+ int idleCount = 0;
+ while (!inserted) {
+ q.metrics.notifyInsertFailure();
+ if (idleCount==0) { // check avoids multiple log msgs when in a idle loop
+ LOG.debug("Experiencing Back Pressure on recvQueue: '{}'. Entering BackPressure Wait", q.getName());
+ }
+
+ idleCount = q.backPressureWaitStrategy.idle(idleCount);
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ inserted = q.tryPublishInternal(obj);
+ }
+
+ }
+
+ /** Non-Blocking call. return value indicates success/failure */
+ @Override
+ public boolean tryPublish(Object obj) {
+ boolean inserted = q.tryPublishInternal(obj);
+ if (!inserted) {
+ q.metrics.notifyInsertFailure();
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void flush() throws InterruptedException {
+ return;
+ }
+
+ @Override
+ public boolean tryFlush() {
+ return true;
+ }
+ } // class DirectInserter
+
+ /* Not thread safe. Have one instance per producer thread or synchronize externally */
+ private static class BatchInserter implements Inserter {
+ private JCQueue q;
+ private final int batchSz;
+ private ArrayList<Object> currentBatch;
+
+ public BatchInserter(JCQueue q, int batchSz) {
+ this.q = q;
+ this.batchSz = batchSz;
+ this.currentBatch = new ArrayList<>(batchSz + 1);
+ }
+
+ /** Blocking call - retires till element is successfully added */
+ @Override
+ public void publish(Object obj) throws InterruptedException {
+ currentBatch.add(obj);
+ if (currentBatch.size() >= batchSz) {
+ flush();
+ }
+ }
+
+ /** Non-Blocking call. return value indicates success/failure */
+ @Override
+ public boolean tryPublish(Object obj) {
+ if (currentBatch.size() >= batchSz) {
+ if (!tryFlush()) {
+ return false;
+ }
+ }
+ currentBatch.add(obj);
+ return true;
+ }
+
+ /** Blocking call - Does not return until at least 1 element is drained or Thread.interrupt() is received.
+ * Uses backpressure wait strategy. */
+ @Override
+ public void flush() throws InterruptedException {
+ if (currentBatch.isEmpty()) {
+ return;
+ }
+ int publishCount = q.tryPublishInternal(currentBatch);
+ int retryCount = 0;
+ while (publishCount == 0) { // retry till at least 1 element is drained
+ q.metrics.notifyInsertFailure();
+ if (retryCount==0) { // check avoids multiple log msgs when in a idle loop
+ LOG.debug("Experiencing Back Pressure when flushing batch to Q: {}. Entering BackPressure Wait.", q.getName());
+ }
+ retryCount = q.backPressureWaitStrategy.idle(retryCount);
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ publishCount = q.tryPublishInternal(currentBatch);
+ }
+ currentBatch.subList(0, publishCount).clear();
+ }
+
+ /** Non blocking call. tries to flush as many as possible. Returns true if at least one from non-empty currentBatch was flushed
+ * or if currentBatch is empty. Returns false otherwise */
+ @Override
+ public boolean tryFlush() {
+ if (currentBatch.isEmpty()) {
+ return true;
+ }
+ int publishCount = q.tryPublishInternal(currentBatch);
+ if (publishCount == 0) {
+ q.metrics.notifyInsertFailure();
+ return false;
+ } else {
+ currentBatch.subList(0, publishCount).clear();
+ return true;
+ }
+ }
+ } // class BatchInserter
+
+ /**
+ * This inner class provides methods to access the metrics of the JCQueue.
+ */
+ public class QueueMetrics {
+ private final RateTracker arrivalsTracker = new RateTracker(10000, 10);
+ private final RateTracker insertFailuresTracker = new RateTracker(10000, 10);
+ private final AtomicLong droppedMessages = new AtomicLong(0);
+
+ public long population() {
+ return recvQueue.size();
+ }
+
+ public long capacity() {
+ return recvQueue.capacity();
+ }
+
+ public Object getState() {
+ HashMap state = new HashMap<String, Object>();
+
+ final double arrivalRateInSecs = arrivalsTracker.reportRate();
+
+ long tuplePop = population();
+
+ // Assume the recvQueue is stable, in which the arrival rate is equal to the consumption rate.
+ // If this assumption does not hold, the calculation of sojourn time should also consider
+ // departure rate according to Queuing Theory.
+ final double sojournTime = tuplePop / Math.max(arrivalRateInSecs, 0.00001) * 1000.0;
+
+ long cap = capacity();
+ float pctFull = (1.0F * tuplePop / cap);
+
+ state.put("capacity", cap);
+ state.put("pct_full", pctFull);
+ state.put("population", tuplePop);
+
+ state.put("arrival_rate_secs", arrivalRateInSecs);
+ state.put("sojourn_time_ms", sojournTime); //element sojourn time in milliseconds
+ state.put("insert_failures", insertFailuresTracker.reportRate());
+ state.put("dropped_messages", droppedMessages);
+ state.put("overflow", overflowQ.size());
+ return state;
+ }
+
+ public void notifyArrivals(long counts) {
+ arrivalsTracker.notify(counts);
+ }
+
+ public void notifyInsertFailure() {
+ insertFailuresTracker.notify(1);
+ }
+
+ public void notifyDroppedMsg() {
+ droppedMessages.incrementAndGet();
+ }
+
+ public void close() {
+ arrivalsTracker.close();
+ insertFailuresTracker.close();
+ }
+
+ }
+
+ private final MpscArrayQueue<Object> recvQueue;
+ private final MpscUnboundedArrayQueue<Object> overflowQ; // only holds msgs from other workers (via WorkerTransfer), when recvQueue is full
+ private final int overflowLimit; // ensures... overflowCount <= overflowLimit. if set to 0, disables overflow.
+
+
+ private final int producerBatchSz;
+ private final DirectInserter directInserter = new DirectInserter(this);
+
+ private final ThreadLocal<BatchInserter> thdLocalBatcher = new ThreadLocal<BatchInserter>(); // ensure 1 instance per producer thd.
+
+ private final JCQueue.QueueMetrics metrics;
+
+ private String queueName;
+ private final IWaitStrategy backPressureWaitStrategy;
+
+ public JCQueue(String queueName, int size, int overflowLimit, int producerBatchSz, IWaitStrategy backPressureWaitStrategy) {
+ this.queueName = queueName;
+ this.overflowLimit = overflowLimit;
+ this.recvQueue = new MpscArrayQueue<>(size);
+ this.overflowQ = new MpscUnboundedArrayQueue<>(size);
+
+ this.metrics = new JCQueue.QueueMetrics();
+
+ //The batch size can be no larger than half the full recvQueue size, to avoid contention issues.
+ this.producerBatchSz = Math.max(1, Math.min(producerBatchSz, size / 2));
+ this.backPressureWaitStrategy = backPressureWaitStrategy;
+ }
+
+ public String getName() {
+ return queueName;
+ }
+
+
+ public boolean haltWithInterrupt() {
+ boolean res = tryPublishInternal(INTERRUPT);
+ metrics.close();
+ return res;
+ }
+
+
+ /**
+ * Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q
+ */
+ public int consume(JCQueue.Consumer consumer) {
+ return consume(consumer, continueRunning);
+ }
+
+ /**
+ * Non blocking. Returns immediately if Q is empty. Runs till Q is empty OR exitCond.keepRunning() return false.
+ * Returns number of elements consumed from Q
+ */
+ public int consume(JCQueue.Consumer consumer, ExitCondition exitCond) {
+ try {
+ return consumeImpl(consumer, exitCond);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public int size() { return recvQueue.size() + overflowQ.size(); }
+
+ /**
+ * Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q
+ * @param exitCond
+ */
+ private int consumeImpl(Consumer consumer, ExitCondition exitCond) throws InterruptedException {
+ int drainCount = 0;
+ while ( exitCond.keepRunning() ) {
+ Object tuple = recvQueue.poll();
+ if (tuple == null) {
+ break;
+ }
+ consumer.accept(tuple);
+ ++drainCount;
+ }
+
+ int overflowDrainCount = 0;
+ int limit = overflowQ.size();
+ while (exitCond.keepRunning() && (overflowDrainCount < limit)) { // 2nd cond prevents staying stuck with consuming overflow
+ Object tuple = overflowQ.poll();
+ ++overflowDrainCount;
+ consumer.accept(tuple);
+ }
+ int total = drainCount + overflowDrainCount;
+ if (total > 0) {
+ consumer.flush();
+ }
+ return total;
+ }
+
+ // Non Blocking. returns true/false indicating success/failure. Fails if full.
+ private boolean tryPublishInternal(Object obj) {
+ if (recvQueue.offer(obj)) {
+ metrics.notifyArrivals(1);
+ return true;
+ }
+ return false;
+ }
+
+ // Non Blocking. returns count of how many inserts succeeded
+ private int tryPublishInternal(ArrayList<Object> objs) {
+ MessagePassingQueue.Supplier<Object> supplier =
+ new MessagePassingQueue.Supplier<Object>() {
+ int i = 0;
+
+ @Override
+ public Object get() {
+ return objs.get(i++);
+ }
+ };
+ int count = recvQueue.fill(supplier, objs.size());
+ metrics.notifyArrivals(count);
+ return count;
+ }
+
+ private Inserter getInserter() {
+ Inserter inserter;
+ if (producerBatchSz > 1) {
+ inserter = thdLocalBatcher.get();
+ if (inserter == null) {
+ BatchInserter b = new BatchInserter(this, producerBatchSz);
+ inserter = b;
+ thdLocalBatcher.set(b);
+ }
+ } else {
+ inserter = directInserter;
+ }
+ return inserter;
+ }
+
+ /**
+ * Blocking call. Retries till it can successfully publish the obj. Can be interrupted via Thread.interrupt().
+ */
+ public void publish(Object obj) throws InterruptedException {
+ Inserter inserter = getInserter();
+ inserter.publish(obj);
+ }
+
+ /**
+ * Non-blocking call, returns false if full
+ **/
+ public boolean tryPublish(Object obj) {
+ Inserter inserter = getInserter();
+ return inserter.tryPublish(obj);
+ }
+
+ /** Non-blocking call. Bypasses any batching that may be enabled on the recvQueue. Intended for sending flush/metrics tuples */
+ public boolean tryPublishDirect(Object obj) {
+ return tryPublishInternal(obj);
+ }
+
+ /**
+ * Un-batched write to overflowQ. Should only be called by WorkerTransfer
+ * returns false if overflowLimit has reached
+ */
+ public boolean tryPublishToOverflow(Object obj) {
+ if (overflowLimit>0 && overflowQ.size() >= overflowLimit) {
+ return false;
+ }
+ overflowQ.add(obj);
+ return true;
+ }
+
+ public void recordMsgDrop() {
+ getMetrics().notifyDroppedMsg();
+ }
+
+ public boolean isEmptyOverflow() {
+ return overflowQ.isEmpty();
+ }
+
+ public int getOverflowCount() {
+ return overflowQ.size();
+ }
+
+ public int getQueuedCount() {
+ return recvQueue.size();
+ }
+
+ /**
+ * if(batchSz>1) : Blocking call. Does not return until at least 1 element is drained or Thread.interrupt() is received
+ * if(batchSz==1) : NO-OP. Returns immediately. doesnt throw.
+ */
+ public void flush() throws InterruptedException {
+ Inserter inserter = getInserter();
+ inserter.flush();
+ }
+
+ /**
+ * if(batchSz>1) : Non-Blocking call. Tries to flush as many as it can. Returns true if flushed at least 1.
+ * if(batchSz==1) : This is a NO-OP. Returns true immediately.
+ */
+ public boolean tryFlush() {
+ Inserter inserter = getInserter();
+ return inserter.tryFlush();
+ }
+
+ @Override
+ public Object getState() {
+ return metrics.getState();
+ }
+
+
+ //This method enables the metrics to be accessed from outside of the JCQueue class
+ public JCQueue.QueueMetrics getMetrics() {
+ return metrics;
+ }
+
+ public interface Consumer extends org.jctools.queues.MessagePassingQueue.Consumer<Object> {
+ void accept(Object event);
+
+ void flush() throws InterruptedException;
+ }
+
+
+ public interface ExitCondition {
+ boolean keepRunning();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/utils/MutableLong.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/MutableLong.java b/storm-client/src/jvm/org/apache/storm/utils/MutableLong.java
index ab14c49..614f25c 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/MutableLong.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/MutableLong.java
@@ -33,7 +33,7 @@ public class MutableLong {
}
public long increment() {
- return increment(1);
+ return ++val;
}
public long increment(long amt) {
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java b/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java
index f50947a..a1deb04 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java
@@ -76,6 +76,10 @@ public class ObjectReader {
throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
}
+ public static Long getLong(Object o) {
+ return getLong(o, null);
+ }
+
public static Long getLong(Object o, Long defaultValue) {
if (null == o) {
return defaultValue;
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/utils/RotatingMap.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/RotatingMap.java b/storm-client/src/jvm/org/apache/storm/utils/RotatingMap.java
index 89dedd4..65f051e 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/RotatingMap.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/RotatingMap.java
@@ -109,7 +109,7 @@ public class RotatingMap<K, V> {
}
- public Object remove(K key) {
+ public V remove(K key) {
for(HashMap<K, V> bucket: _buckets) {
if(bucket.containsKey(key)) {
return bucket.remove(key);
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/utils/Time.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Time.java b/storm-client/src/jvm/org/apache/storm/utils/Time.java
index 142c432..80ecf5d 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Time.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Time.java
@@ -145,11 +145,15 @@ public class Time {
}
public static void sleep(long ms) throws InterruptedException {
- sleepUntil(currentTimeMillis()+ms);
+ if(ms>0) {
+ sleepUntil(currentTimeMillis() + ms);
+ }
}
public static void sleepNanos(long nanos) throws InterruptedException {
- sleepUntilNanos(nanoTime() + nanos);
+ if(nanos>0) {
+ sleepUntilNanos(nanoTime() + nanos);
+ }
}
public static void sleepSecs (long secs) throws InterruptedException {
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java b/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
index f14136c..f1d30a5 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
@@ -6,135 +6,138 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.utils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.Map.Entry;
import org.apache.storm.generated.NodeInfo;
import org.apache.storm.messaging.IConnection;
import org.apache.storm.messaging.TaskMessage;
-import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TransferDrainer {
- private Map<Integer, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
- private static final Logger LOG = LoggerFactory.getLogger(TransferDrainer.class);
-
- public void add(HashMap<Integer, ArrayList<TaskMessage>> taskTupleSetMap) {
- for (Map.Entry<Integer, ArrayList<TaskMessage>> entry : taskTupleSetMap.entrySet()) {
- addListRefToMap(this.bundles, entry.getKey(), entry.getValue());
- }
- }
-
- public void send(Map<Integer, NodeInfo> taskToNode, Map<NodeInfo, IConnection> connections) {
- HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> bundleMapByDestination = groupBundleByDestination(taskToNode);
-
- for (Map.Entry<NodeInfo, ArrayList<ArrayList<TaskMessage>>> entry : bundleMapByDestination.entrySet()) {
- NodeInfo hostPort = entry.getKey();
- IConnection connection = connections.get(hostPort);
- if (null != connection) {
- ArrayList<ArrayList<TaskMessage>> bundle = entry.getValue();
- Iterator<TaskMessage> iter = getBundleIterator(bundle);
- if (null != iter && iter.hasNext()) {
- connection.send(iter);
+ private static final Logger LOG = LoggerFactory.getLogger(TransferDrainer.class);
+ private Map<Integer, ArrayList<TaskMessage>> bundles = new HashMap();
+
+ // Cache the msgs grouped by destination node
+ public void add(TaskMessage taskMsg) {
+ int destId = taskMsg.task();
+ ArrayList<TaskMessage> msgs = bundles.get(destId);
+ if (msgs == null) {
+ msgs = new ArrayList<>();
+ bundles.put(destId, msgs);
}
- } else {
- LOG.warn("Connection is not available for hostPort {}", hostPort);
- }
+ msgs.add(taskMsg);
}
- }
-
- private HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> groupBundleByDestination(Map<Integer, NodeInfo> taskToNode) {
- HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> bundleMap = Maps.newHashMap();
- for (Integer task : this.bundles.keySet()) {
- NodeInfo hostPort = taskToNode.get(task);
- if (hostPort != null) {
- for (ArrayList<TaskMessage> chunk : this.bundles.get(task)) {
- addListRefToMap(bundleMap, hostPort, chunk);
+
+ public void send(Map<Integer, NodeInfo> taskToNode, Map<NodeInfo, IConnection> connections) {
+ HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> bundleMapByDestination = groupBundleByDestination(taskToNode);
+
+ for (Map.Entry<NodeInfo, ArrayList<ArrayList<TaskMessage>>> entry : bundleMapByDestination.entrySet()) {
+ NodeInfo node = entry.getKey();
+ IConnection conn = connections.get(node);
+ if (conn != null) {
+ ArrayList<ArrayList<TaskMessage>> bundle = entry.getValue();
+ Iterator<TaskMessage> iter = getBundleIterator(bundle);
+ if (null != iter && iter.hasNext()) {
+ conn.send(iter);
+ }
+ entry.getValue().clear();
+ } else {
+ LOG.warn("Connection not available for hostPort {}", node);
+ }
}
- } else {
- LOG.warn("No remote destination available for task {}", task);
- }
}
- return bundleMap;
- }
- private <T> void addListRefToMap(Map<T, ArrayList<ArrayList<TaskMessage>>> bundleMap,
- T key, ArrayList<TaskMessage> tuples) {
- ArrayList<ArrayList<TaskMessage>> bundle = bundleMap.get(key);
+ private HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> groupBundleByDestination(Map<Integer, NodeInfo> taskToNode) {
+ HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> result = new HashMap<>();
- if (null == bundle) {
- bundle = new ArrayList<ArrayList<TaskMessage>>();
- bundleMap.put(key, bundle);
+ for (Entry<Integer, ArrayList<TaskMessage>> entry : bundles.entrySet()) {
+ if (entry.getValue().isEmpty()) {
+ continue;
+ }
+ NodeInfo node = taskToNode.get(entry.getKey());
+ if (node != null) {
+ ArrayList<ArrayList<TaskMessage>> msgs = result.get(node);
+ if (msgs == null) {
+ msgs = new ArrayList<>();
+ result.put(node, msgs);
+ }
+ msgs.add(entry.getValue());
+ } else {
+ LOG.warn("No remote destination available for task {}", entry.getKey());
+ }
+ }
+ return result;
}
- if (null != tuples && tuples.size() > 0) {
- bundle.add(tuples);
- }
- }
+ private Iterator<TaskMessage> getBundleIterator(final ArrayList<ArrayList<TaskMessage>> bundle) {
- private Iterator<TaskMessage> getBundleIterator(final ArrayList<ArrayList<TaskMessage>> bundle) {
-
- if (null == bundle) {
- return null;
- }
-
- return new Iterator<TaskMessage> () {
-
- private int offset = 0;
- private int size = 0;
- {
- for (ArrayList<TaskMessage> list : bundle) {
- size += list.size();
+ if (null == bundle) {
+ return null;
}
- }
-
- private int bundleOffset = 0;
- private Iterator<TaskMessage> iter = bundle.get(bundleOffset).iterator();
-
- @Override
- public boolean hasNext() {
- return offset < size;
- }
-
- @Override
- public TaskMessage next() {
- TaskMessage msg;
- if (iter.hasNext()) {
- msg = iter.next();
- } else {
- bundleOffset++;
- iter = bundle.get(bundleOffset).iterator();
- msg = iter.next();
- }
- if (null != msg) {
- offset++;
+
+ return new Iterator<TaskMessage>() {
+
+ private int offset = 0;
+ private int size = 0;
+ private int bundleOffset = 0;
+ private Iterator<TaskMessage> iter = bundle.get(bundleOffset).iterator();
+
+ {
+ for (ArrayList<TaskMessage> list : bundle) {
+ size += list.size();
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return offset < size;
+ }
+
+ @Override
+ public TaskMessage next() {
+ TaskMessage msg;
+ if (iter.hasNext()) {
+ msg = iter.next();
+ } else {
+ bundleOffset++;
+ iter = bundle.get(bundleOffset).iterator();
+ msg = iter.next();
+ }
+ if (null != msg) {
+ offset++;
+ }
+ return msg;
+ }
+
+ @Override
+ public void remove() {
+ throw new RuntimeException("not supported");
+ }
+ };
+ }
+
+
+ public void clear() {
+ for (ArrayList<TaskMessage> taskMessages : bundles.values()) {
+ taskMessages.clear();
}
- return msg;
- }
-
- @Override
- public void remove() {
- throw new RuntimeException("not supported");
- }
- };
- }
-
- public void clear() {
- bundles.clear();
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index e20e2ee..26d267e 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -341,20 +341,22 @@ public class Utils {
* @return the newly created thread
* @see Thread
*/
- public static SmartThread asyncLoop(final Callable afn,
- boolean isDaemon, final Thread.UncaughtExceptionHandler eh,
- int priority, final boolean isFactory, boolean startImmediately,
- String threadName) {
+ public static SmartThread asyncLoop(final Callable afn, boolean isDaemon, final Thread.UncaughtExceptionHandler eh,
+ int priority, final boolean isFactory, boolean startImmediately,
+ String threadName) {
SmartThread thread = new SmartThread(new Runnable() {
public void run() {
- Object s;
try {
- Callable fn = isFactory ? (Callable) afn.call() : afn;
- while ((s = fn.call()) instanceof Long) {
- Time.sleepSecs((Long) s);
+ final Callable<Long> fn = isFactory ? (Callable<Long>) afn.call() : afn;
+ while (true) {
+ final Long s = fn.call();
+ if (s==null) // then stop running it
+ break;
+ if (s>0)
+ Time.sleep(s);
}
} catch (Throwable t) {
- if (exceptionCauseIsInstanceOf(
+ if (Utils.exceptionCauseIsInstanceOf(
InterruptedException.class, t)) {
LOG.info("Async loop interrupted!");
return;
@@ -370,7 +372,7 @@ public class Utils {
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
LOG.error("Async loop died!", e);
- exitProcess(1, "Async loop died!");
+ Utils.exitProcess(1, "Async loop died!");
}
});
}
@@ -1552,4 +1554,20 @@ public class Utils {
}
return ret;
}
+
+ public static <V> ArrayList<V> convertToArray(Map<Integer, V> srcMap, int start) {
+ Set<Integer> ids = srcMap.keySet();
+ Integer largestId = ids.stream().max(Integer::compareTo).get();
+ int end = largestId - start;
+ ArrayList<V> result = new ArrayList<>(Collections.nCopies(end+1 , null)); // creates array[largestId+1] filled with nulls
+ for( Map.Entry<Integer, V> entry : srcMap.entrySet() ) {
+ int id = entry.getKey();
+ if (id < start) {
+ LOG.debug("Entry {} will be skipped it is too small {} ...", id, start);
+ } else {
+ result.set(id - start, entry.getValue());
+ }
+ }
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/utils/WorkerBackpressureCallback.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/WorkerBackpressureCallback.java b/storm-client/src/jvm/org/apache/storm/utils/WorkerBackpressureCallback.java
deleted file mode 100755
index 47c039a..0000000
--- a/storm-client/src/jvm/org/apache/storm/utils/WorkerBackpressureCallback.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.storm.utils;
-
-public interface WorkerBackpressureCallback {
-
- void onEvent(Object obj);
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java b/storm-client/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
deleted file mode 100644
index 832448e..0000000
--- a/storm-client/src/jvm/org/apache/storm/utils/WorkerBackpressureThread.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.utils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class WorkerBackpressureThread extends Thread {
-
- private static final Logger LOG = LoggerFactory.getLogger(WorkerBackpressureThread.class);
- private final Object trigger;
- private final Object workerData;
- private final WorkerBackpressureCallback callback;
- private volatile boolean running = true;
-
- public WorkerBackpressureThread(Object trigger, Object workerData, WorkerBackpressureCallback callback) {
- this.trigger = trigger;
- this.workerData = workerData;
- this.callback = callback;
- this.setName("WorkerBackpressureThread");
- this.setDaemon(true);
- this.setUncaughtExceptionHandler(new BackpressureUncaughtExceptionHandler());
- }
-
- static public void notifyBackpressureChecker(final Object trigger) {
- try {
- synchronized (trigger) {
- trigger.notifyAll();
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public void terminate() throws InterruptedException {
- running = false;
- interrupt();
- join();
- }
-
- public void run() {
- while (running) {
- try {
- synchronized (trigger) {
- trigger.wait(100);
- }
- callback.onEvent(workerData); // check all executors and update zk backpressure throttle for the worker if needed
- } catch (InterruptedException interEx) {
- // ignored, we are shutting down.
- }
- }
- }
-}
-
-class BackpressureUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
- private static final Logger LOG = LoggerFactory.getLogger(BackpressureUncaughtExceptionHandler.class);
-
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- // note that exception that happens during connecting to ZK has been ignored in the callback implementation
- LOG.error("Received error or exception in WorkerBackpressureThread.. terminating the worker...", e);
- Runtime.getRuntime().exit(1);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
index 7db9aa5..34adbc4 100644
--- a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
+++ b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
@@ -36,6 +36,7 @@ import java.util.Set;
import org.apache.storm.Config;
import org.apache.storm.utils.Utils;
+import org.apache.storm.validation.ConfigValidationAnnotations.ValidatorParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,6 +97,38 @@ public class ConfigValidation {
}
}
+ /**
+ * Checks if the named type derives from the specified Class
+ */
+ public static class DerivedTypeValidator extends Validator {
+
+ private Class<?> baseType;
+
+ public DerivedTypeValidator(Map<String, Object> params) {
+ this.baseType = (Class<?>) params.get(ValidatorParams.BASE_TYPE);
+ }
+
+ @Override
+ public void validateField(String name, Object actualTypeName) {
+ validateField(name, this.baseType, actualTypeName);
+ }
+
+ public static void validateField(String name, Class<?> baseType, Object actualTypeName) {
+ if (actualTypeName == null) {
+ return;
+ }
+ try {
+ Class<?> actualType = Class.forName(actualTypeName.toString());
+ if (baseType.isAssignableFrom(actualType)) {
+ return;
+ }
+ throw new IllegalArgumentException("Field " + name + " must represent a type that derives from '" + baseType + "'. Specified type = " + actualTypeName);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+ }
+
public static class StringValidator extends Validator {
private HashSet<String> acceptedValues = null;
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java
index 56dc574..791c145 100644
--- a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java
+++ b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidationAnnotations.java
@@ -39,6 +39,7 @@ public class ConfigValidationAnnotations {
public static class ValidatorParams {
static final String VALIDATOR_CLASS = "validatorClass";
static final String TYPE = "type";
+ static final String BASE_TYPE = "baseType";
static final String ENTRY_VALIDATOR_CLASSES = "entryValidatorClasses";
static final String KEY_VALIDATOR_CLASSES = "keyValidatorClasses";
static final String VALUE_VALIDATOR_CLASSES = "valueValidatorClasses";
@@ -62,6 +63,14 @@ public class ConfigValidationAnnotations {
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
+ public @interface isDerivedFrom {
+ Class<?> validatorClass() default ConfigValidation.DerivedTypeValidator.class;
+
+ Class<?> baseType();
+ }
+
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.FIELD)
public @interface isStringList {
Class<?> validatorClass() default ConfigValidation.ListEntryTypeValidator.class;
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/test/jvm/org/apache/storm/bolt/TestJoinBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/bolt/TestJoinBolt.java b/storm-client/test/jvm/org/apache/storm/bolt/TestJoinBolt.java
index 8f99f68..ff209bd 100644
--- a/storm-client/test/jvm/org/apache/storm/bolt/TestJoinBolt.java
+++ b/storm-client/test/jvm/org/apache/storm/bolt/TestJoinBolt.java
@@ -87,7 +87,7 @@ public class TestJoinBolt {
@Test
public void testTrivial() throws Exception {
- ArrayList<Tuple> orderStream = makeStream("orders", orderFields, orders);
+ ArrayList<Tuple> orderStream = makeStream("orders", orderFields, orders, "ordersSpout");
TupleWindow window = makeTupleWindow(orderStream);
JoinBolt bolt = new JoinBolt(JoinBolt.Selector.STREAM, "orders", orderFields[0])
@@ -101,7 +101,7 @@ public class TestJoinBolt {
@Test
public void testNestedKeys() throws Exception {
- ArrayList<Tuple> userStream = makeNestedEventsStream("users", userFields, users);
+ ArrayList<Tuple> userStream = makeNestedEventsStream("users", userFields, users, "usersSpout");
TupleWindow window = makeTupleWindow(userStream);
JoinBolt bolt = new JoinBolt(JoinBolt.Selector.STREAM, "users", "outer.userId")
.select("outer.name, outer.city");
@@ -115,8 +115,8 @@ public class TestJoinBolt {
@Test
public void testProjection_FieldsWithStreamName() throws Exception {
- ArrayList<Tuple> userStream = makeStream("users", userFields, users);
- ArrayList<Tuple> storeStream = makeStream("stores", storeFields, stores);
+ ArrayList<Tuple> userStream = makeStream("users", userFields, users, "usersSpout");
+ ArrayList<Tuple> storeStream = makeStream("stores", storeFields, stores, "storesSpout");
TupleWindow window = makeTupleWindow(storeStream, userStream);
@@ -141,8 +141,8 @@ public class TestJoinBolt {
@Test
public void testInnerJoin() throws Exception {
- ArrayList<Tuple> userStream = makeStream("users", userFields, users);
- ArrayList<Tuple> orderStream = makeStream("orders", orderFields, orders);
+ ArrayList<Tuple> userStream = makeStream("users", userFields, users, "usersSpout");
+ ArrayList<Tuple> orderStream = makeStream("orders", orderFields, orders, "ordersSpout");
TupleWindow window = makeTupleWindow(orderStream, userStream);
JoinBolt bolt = new JoinBolt(JoinBolt.Selector.STREAM, "users", userFields[0])
@@ -158,8 +158,8 @@ public class TestJoinBolt {
@Test
public void testLeftJoin() throws Exception {
- ArrayList<Tuple> userStream = makeStream("users", userFields, users);
- ArrayList<Tuple> orderStream = makeStream("orders", orderFields, orders);
+ ArrayList<Tuple> userStream = makeStream("users", userFields, users, "usersSpout");
+ ArrayList<Tuple> orderStream = makeStream("orders", orderFields, orders, "ordersSpout");
TupleWindow window = makeTupleWindow(orderStream, userStream);
JoinBolt bolt = new JoinBolt(JoinBolt.Selector.STREAM, "users", userFields[0])
@@ -175,9 +175,9 @@ public class TestJoinBolt {
@Test
public void testThreeStreamInnerJoin() throws Exception {
- ArrayList<Tuple> userStream = makeStream("users", userFields, users);
- ArrayList<Tuple> storesStream = makeStream("stores", storeFields, stores);
- ArrayList<Tuple> cityStream = makeStream("cities", cityFields, cities);
+ ArrayList<Tuple> userStream = makeStream("users", userFields, users, "usersSpout");
+ ArrayList<Tuple> storesStream = makeStream("stores", storeFields, stores, "storesSpout");
+ ArrayList<Tuple> cityStream = makeStream("cities", cityFields, cities, "citiesSpout");
TupleWindow window = makeTupleWindow(userStream, storesStream, cityStream);
@@ -196,9 +196,9 @@ public class TestJoinBolt {
@Test
public void testThreeStreamLeftJoin_1() throws Exception {
- ArrayList<Tuple> userStream = makeStream("users", userFields, users);
- ArrayList<Tuple> storesStream = makeStream("stores", storeFields, stores);
- ArrayList<Tuple> cityStream = makeStream("cities", cityFields, cities);
+ ArrayList<Tuple> userStream = makeStream("users", userFields, users, "usersSpout");
+ ArrayList<Tuple> storesStream = makeStream("stores", storeFields, stores, "storesSpout");
+ ArrayList<Tuple> cityStream = makeStream("cities", cityFields, cities, "citiesSpout");
TupleWindow window = makeTupleWindow(userStream, cityStream, storesStream);
@@ -216,9 +216,9 @@ public class TestJoinBolt {
@Test
public void testThreeStreamLeftJoin_2() throws Exception {
- ArrayList<Tuple> userStream = makeStream("users", userFields, users);
- ArrayList<Tuple> storesStream = makeStream("stores", storeFields, stores);
- ArrayList<Tuple> cityStream = makeStream("cities", cityFields, cities);
+ ArrayList<Tuple> userStream = makeStream("users", userFields, users, "usersSpout");
+ ArrayList<Tuple> storesStream = makeStream("stores", storeFields, stores, "storesSpout");
+ ArrayList<Tuple> cityStream = makeStream("cities", cityFields, cities, "citiesSpout");
TupleWindow window = makeTupleWindow(userStream, cityStream, storesStream);
@@ -238,9 +238,9 @@ public class TestJoinBolt {
@Test
public void testThreeStreamMixedJoin() throws Exception {
- ArrayList<Tuple> userStream = makeStream("users", userFields, users);
- ArrayList<Tuple> storesStream = makeStream("stores", storeFields, stores);
- ArrayList<Tuple> cityStream = makeStream("cities", cityFields, cities);
+ ArrayList<Tuple> userStream = makeStream("users", userFields, users, "usersSpout");
+ ArrayList<Tuple> storesStream = makeStream("stores", storeFields, stores, "storesSpout");
+ ArrayList<Tuple> cityStream = makeStream("cities", cityFields, cities, "citiesSpout");
TupleWindow window = makeTupleWindow(userStream, cityStream, storesStream);
@@ -286,12 +286,12 @@ public class TestJoinBolt {
}
- private static ArrayList<Tuple> makeStream(String streamName, String[] fieldNames, Object[][] data) {
+ private static ArrayList<Tuple> makeStream(String streamName, String[] fieldNames, Object[][] data, String srcComponentName) {
ArrayList<Tuple> result = new ArrayList<>();
MockContext mockContext = new MockContext(fieldNames);
for (Object[] record : data) {
- TupleImpl rec = new TupleImpl(mockContext, Arrays.asList(record), 0, streamName);
+ TupleImpl rec = new TupleImpl(mockContext, Arrays.asList(record), srcComponentName, 0, streamName);
result.add( rec );
}
@@ -299,7 +299,8 @@ public class TestJoinBolt {
}
- private static ArrayList<Tuple> makeNestedEventsStream (String streamName, String[] fieldNames, Object[][] records) {
+ private static ArrayList<Tuple> makeNestedEventsStream (String streamName, String[] fieldNames, Object[][] records
+ , String srcComponentName) {
MockContext mockContext = new MockContext(new String[]{"outer"} );
ArrayList<Tuple> result = new ArrayList<>(records.length);
@@ -313,7 +314,7 @@ public class TestJoinBolt {
ArrayList<Object> tupleValues = new ArrayList<>(1);
tupleValues.add(recordMap);
- TupleImpl tuple = new TupleImpl(mockContext, tupleValues, 0, streamName);
+ TupleImpl tuple = new TupleImpl(mockContext, tupleValues, srcComponentName, 0, streamName);
result.add( tuple );
}
@@ -341,7 +342,7 @@ public class TestJoinBolt {
private final Fields fields;
public MockContext(String[] fieldNames) {
- super(null, null, null, null, null, null);
+ super(null, new HashMap<>(), null, null, null, null);
this.fields = new Fields(fieldNames);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java b/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java
index bd20d7b..e2ef41e 100644
--- a/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java
+++ b/storm-client/test/jvm/org/apache/storm/cluster/StormClusterStateImplTest.java
@@ -38,8 +38,7 @@ public class StormClusterStateImplTest {
ClusterUtils.ERRORS_SUBTREE,
ClusterUtils.BLOBSTORE_SUBTREE,
ClusterUtils.NIMBUSES_SUBTREE,
- ClusterUtils.LOGCONFIG_SUBTREE,
- ClusterUtils.BACKPRESSURE_SUBTREE };
+ ClusterUtils.LOGCONFIG_SUBTREE};
private IStateStorage storage;
private ClusterStateContext context;
@@ -64,45 +63,5 @@ public class StormClusterStateImplTest {
Mockito.verify(storage).mkdirs(path, null);
}
}
-
- @Test
- public void removeBackpressureDoesNotThrowTest() {
- // setup to throw
- Mockito.doThrow(new RuntimeException(new KeeperException.NoNodeException("foo")))
- .when(storage)
- .delete_node(Matchers.anyString());
- try {
- state.removeBackpressure("bogus-topo-id");
- // teardown backpressure should have caught the exception
- Mockito.verify(storage)
- .delete_node(ClusterUtils.backpressureStormRoot("bogus-topo-id"));
- } catch (Exception e) {
- Assert.fail("Exception thrown when it shouldn't have: " + e);
- }
- }
-
- @Test
- public void removeWorkerBackpressureDoesntAttemptForNonExistentZNodeTest() {
- // setup to throw
- Mockito.when(storage.node_exists(Matchers.anyString(), Matchers.anyBoolean()))
- .thenReturn(false);
-
- state.removeWorkerBackpressure("bogus-topo-id", "bogus-host", new Long(1234));
-
- Mockito.verify(storage, Mockito.never())
- .delete_node(Matchers.anyString());
- }
-
- @Test
- public void removeWorkerBackpressureCleansForExistingZNodeTest() {
- // setup to throw
- Mockito.when(storage.node_exists(Matchers.anyString(), Matchers.anyBoolean()))
- .thenReturn(true);
-
- state.removeWorkerBackpressure("bogus-topo-id", "bogus-host", new Long(1234));
-
- Mockito.verify(storage)
- .delete_node(ClusterUtils.backpressurePath("bogus-topo-id", "bogus-host", new Long(1234)));
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java b/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
index 76fdf33..7d2d5d2 100644
--- a/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
+++ b/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
@@ -51,7 +51,7 @@ import static org.junit.Assert.fail;
* Unit tests for {@link WindowedBoltExecutor}
*/
public class WindowedBoltExecutorTest {
-
+
private WindowedBoltExecutor executor;
private TestWindowedBolt testWindowedBolt;
@@ -77,8 +77,8 @@ public class WindowedBoltExecutorTest {
};
}
- private Tuple getTuple(String streamId, final Fields fields, Values values) {
- return new TupleImpl(getContext(fields), values, 1, streamId) {
+ private Tuple getTuple(String streamId, final Fields fields, Values values, String srcComponent) {
+ return new TupleImpl(getContext(fields), values, srcComponent, 1, streamId) {
@Override
public GlobalStreamId getSourceGlobalStreamId() {
return new GlobalStreamId("s1", "default");
@@ -118,14 +118,14 @@ public class WindowedBoltExecutorTest {
@Test(expected = IllegalArgumentException.class)
public void testExecuteWithoutTs() throws Exception {
- executor.execute(getTuple("s1", new Fields("a"), new Values(1)));
+ executor.execute(getTuple("s1", new Fields("a"), new Values(1), "s1Src"));
}
@Test
public void testExecuteWithTs() throws Exception {
long[] timestamps = {603, 605, 607, 618, 626, 636};
for (long ts : timestamps) {
- executor.execute(getTuple("s1", new Fields("ts"), new Values(ts)));
+ executor.execute(getTuple("s1", new Fields("ts"), new Values(ts), "s1Src"));
}
//Thread.sleep(120);
executor.waterMarkEventGenerator.run();
@@ -216,13 +216,13 @@ public class WindowedBoltExecutorTest {
List<Tuple> tuples = new ArrayList<>(timestamps.length);
for (long ts : timestamps) {
- Tuple tuple = getTuple("s1", new Fields("ts"), new Values(ts));
+ Tuple tuple = getTuple("s1", new Fields("ts"), new Values(ts), "s1Src");
tuples.add(tuple);
executor.execute(tuple);
-
+
//Update the watermark to this timestamp
executor.waterMarkEventGenerator.run();
- }
+ }
System.out.println(testWindowedBolt.tupleWindows);
Tuple tuple = tuples.get(tuples.size() - 1);
Mockito.verify(outputCollector).emit("$late", Arrays.asList(tuple), new Values(tuple));
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java b/storm-client/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
deleted file mode 100644
index 7072e55..0000000
--- a/storm-client/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.utils;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.dsl.ProducerType;
-import org.junit.Assert;
-import org.junit.Test;
-import junit.framework.TestCase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DisruptorQueueBackpressureTest extends TestCase {
- private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueueBackpressureTest.class);
-
- private final static int MESSAGES = 100;
- private final static int CAPACITY = 128;
- private final static double HIGH_WATERMARK = 0.6;
- private final static double LOW_WATERMARK = 0.2;
-
- @Test
- public void testBackPressureCallback() throws Exception {
-
- final DisruptorQueue queue = createQueue("testBackPressure", CAPACITY);
- queue.setEnableBackpressure(true);
- queue.setHighWaterMark(HIGH_WATERMARK);
- queue.setLowWaterMark(LOW_WATERMARK);
-
- final AtomicBoolean throttleOn = new AtomicBoolean(false);
- // we need to record the cursor because the DisruptorQueue does not update the readPos during batch consuming
- final AtomicLong consumerCursor = new AtomicLong(-1);
-
- DisruptorBackpressureCallbackImpl cb = new DisruptorBackpressureCallbackImpl(queue, throttleOn, consumerCursor);
- queue.registerBackpressureCallback(cb);
-
- for (int i = 0; i < MESSAGES; i++) {
- queue.publish(String.valueOf(i));
- }
-
-
- queue.consumeBatchWhenAvailable(new EventHandler<Object>() {
- @Override
- public void onEvent(Object o, long l, boolean b) throws Exception {
- consumerCursor.set(l);
- }
- });
-
-
- Assert.assertEquals("Check the calling time of throttle on. ",
- queue.getHighWaterMark(), cb.highWaterMarkCalledPopulation);
- Assert.assertEquals("Checking the calling time of throttle off. ",
- queue.getLowWaterMark(), cb.lowWaterMarkCalledPopulation);
- }
-
- class DisruptorBackpressureCallbackImpl implements DisruptorBackpressureCallback {
- // the queue's population when the high water mark callback is called for the first time
- public long highWaterMarkCalledPopulation = -1;
- // the queue's population when the low water mark callback is called for the first time
- public long lowWaterMarkCalledPopulation = -1;
-
- DisruptorQueue queue;
- AtomicBoolean throttleOn;
- AtomicLong consumerCursor;
-
- public DisruptorBackpressureCallbackImpl(DisruptorQueue queue, AtomicBoolean throttleOn,
- AtomicLong consumerCursor) {
- this.queue = queue;
- this.throttleOn = throttleOn;
- this.consumerCursor = consumerCursor;
- }
-
- @Override
- public void highWaterMark() throws Exception {
- if (!throttleOn.get()) {
- highWaterMarkCalledPopulation = queue.getMetrics().population() + queue.getMetrics().overflow();
- throttleOn.set(true);
- }
- }
-
- @Override
- public void lowWaterMark() throws Exception {
- if (throttleOn.get()) {
- lowWaterMarkCalledPopulation = queue.getMetrics().writePos() - consumerCursor.get() + queue.getMetrics().overflow();
- throttleOn.set(false);
- }
- }
- }
-
- private static DisruptorQueue createQueue(String name, int queueSize) {
- return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L);
- }
-}